This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1fad4a207aa [refactor](pipeline) Refactor non-pipeline code structure
(#35900)
1fad4a207aa is described below
commit 1fad4a207aa7d608dbd6ef51b3d551ebe3538e7e
Author: Gabriel <[email protected]>
AuthorDate: Wed Jun 5 19:48:17 2024 +0800
[refactor](pipeline) Refactor non-pipeline code structure (#35900)
---
be/src/pipeline/common/agg_utils.h | 340 +++++++++++++++++++
.../data_gen_functions/vdata_gen_function_inf.h | 4 +-
.../common}/data_gen_functions/vnumbers_tvf.cpp | 14 +-
.../common}/data_gen_functions/vnumbers_tvf.h | 6 +-
be/src/pipeline/common/join_utils.h | 68 ++++
.../common}/runtime_filter_consumer.cpp | 10 +-
.../common}/runtime_filter_consumer.h | 9 +-
be/src/pipeline/dependency.cpp | 20 +-
be/src/pipeline/dependency.h | 85 ++---
be/src/pipeline/exec/aggregation_sink_operator.cpp | 38 +--
be/src/pipeline/exec/aggregation_sink_operator.h | 4 +-
.../pipeline/exec/aggregation_source_operator.cpp | 1 +
be/src/pipeline/exec/analytic_sink_operator.cpp | 1 +
be/src/pipeline/exec/analytic_source_operator.cpp | 1 +
be/src/pipeline/exec/datagen_operator.cpp | 7 +-
be/src/pipeline/exec/datagen_operator.h | 4 +-
.../distinct_streaming_aggregation_operator.cpp | 3 +-
.../exec/distinct_streaming_aggregation_operator.h | 2 +-
be/src/pipeline/exec/es_scan_operator.cpp | 14 +-
be/src/pipeline/exec/hashjoin_build_sink.cpp | 29 +-
be/src/pipeline/exec/hashjoin_build_sink.h | 60 +++-
be/src/pipeline/exec/hashjoin_probe_operator.cpp | 5 +-
be/src/pipeline/exec/hashjoin_probe_operator.h | 24 +-
.../exec/join/cross_join_impl.cpp | 2 +-
.../exec/join/full_outer_join_impl.cpp | 2 +-
.../exec/join/inner_join_impl.cpp | 2 +-
be/src/{vec => pipeline}/exec/join/join_op.h | 14 +-
.../exec/join/left_anti_join_impl.cpp | 2 +-
.../exec/join/left_outer_join_impl.cpp | 2 +-
.../exec/join/left_semi_join_impl.cpp | 2 +-
.../exec/join/null_aware_left_anti_join_impl.cpp | 2 +-
.../exec/join/null_aware_left_semi_join_impl.cpp | 2 +-
.../exec/join/process_hash_table_probe.h | 56 ++--
.../exec/join/process_hash_table_probe_impl.h | 192 +++++------
.../exec/join/right_anti_join_impl.cpp | 2 +-
.../exec/join/right_outer_join_impl.cpp | 2 +-
.../exec/join/right_semi_join_impl.cpp | 2 +-
.../exec/multi_cast_data_stream_source.cpp | 7 +-
.../pipeline/exec/multi_cast_data_stream_source.h | 5 +-
be/src/pipeline/exec/olap_scan_operator.cpp | 2 +-
.../exec/partitioned_aggregation_sink_operator.h | 1 +
be/src/pipeline/exec/scan_operator.cpp | 2 +-
be/src/pipeline/exec/scan_operator.h | 11 +-
be/src/pipeline/exec/set_source_operator.cpp | 2 +-
be/src/pipeline/exec/set_source_operator.h | 2 +-
.../exec/streaming_aggregation_operator.cpp | 16 +-
.../pipeline/exec/streaming_aggregation_operator.h | 4 +-
be/src/runtime/fragment_mgr.cpp | 20 --
be/src/runtime/query_context.cpp | 21 +-
be/src/runtime/query_context.h | 2 +-
be/src/vec/common/hash_table/hash_map_context.h | 12 +-
.../vec/common/hash_table/hash_table_set_build.h | 1 +
be/src/vec/exec/join/vacquire_list.hpp | 54 ---
be/src/vec/exec/join/vhash_join_node.h | 180 ----------
be/src/vec/exec/scan/vscan_node.h | 52 ---
be/src/vec/exec/vaggregation_node.h | 366 ---------------------
56 files changed, 791 insertions(+), 1002 deletions(-)
diff --git a/be/src/pipeline/common/agg_utils.h
b/be/src/pipeline/common/agg_utils.h
new file mode 100644
index 00000000000..e0435954b8b
--- /dev/null
+++ b/be/src/pipeline/common/agg_utils.h
@@ -0,0 +1,340 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <variant>
+#include <vector>
+
+#include "vec/common/arena.h"
+#include "vec/common/hash_table/hash_map_context.h"
+#include "vec/common/hash_table/hash_map_context_creator.h"
+#include "vec/common/hash_table/hash_map_util.h"
+#include "vec/common/hash_table/ph_hash_map.h"
+#include "vec/common/hash_table/string_hash_map.h"
+
+namespace doris {
+namespace pipeline {
+
+using AggregatedDataWithoutKey = vectorized::AggregateDataPtr;
+using AggregatedDataWithStringKey = PHHashMap<StringRef,
vectorized::AggregateDataPtr>;
+using AggregatedDataWithShortStringKey =
StringHashMap<vectorized::AggregateDataPtr>;
+using AggregatedDataWithUInt8Key = PHHashMap<vectorized::UInt8,
vectorized::AggregateDataPtr>;
+using AggregatedDataWithUInt16Key = PHHashMap<vectorized::UInt16,
vectorized::AggregateDataPtr>;
+using AggregatedDataWithUInt32Key =
+ PHHashMap<vectorized::UInt32, vectorized::AggregateDataPtr,
HashCRC32<vectorized::UInt32>>;
+using AggregatedDataWithUInt64Key =
+ PHHashMap<vectorized::UInt64, vectorized::AggregateDataPtr,
HashCRC32<vectorized::UInt64>>;
+using AggregatedDataWithUInt128Key = PHHashMap<vectorized::UInt128,
vectorized::AggregateDataPtr,
+ HashCRC32<vectorized::UInt128>>;
+using AggregatedDataWithUInt256Key = PHHashMap<vectorized::UInt256,
vectorized::AggregateDataPtr,
+ HashCRC32<vectorized::UInt256>>;
+using AggregatedDataWithUInt136Key = PHHashMap<vectorized::UInt136,
vectorized::AggregateDataPtr,
+ HashCRC32<vectorized::UInt136>>;
+
+using AggregatedDataWithUInt32KeyPhase2 =
+ PHHashMap<vectorized::UInt32, vectorized::AggregateDataPtr,
+ HashMixWrapper<vectorized::UInt32>>;
+using AggregatedDataWithUInt64KeyPhase2 =
+ PHHashMap<vectorized::UInt64, vectorized::AggregateDataPtr,
+ HashMixWrapper<vectorized::UInt64>>;
+using AggregatedDataWithUInt128KeyPhase2 =
+ PHHashMap<vectorized::UInt128, vectorized::AggregateDataPtr,
+ HashMixWrapper<vectorized::UInt128>>;
+using AggregatedDataWithUInt256KeyPhase2 =
+ PHHashMap<vectorized::UInt256, vectorized::AggregateDataPtr,
+ HashMixWrapper<vectorized::UInt256>>;
+
+using AggregatedDataWithUInt136KeyPhase2 =
+ PHHashMap<vectorized::UInt136, vectorized::AggregateDataPtr,
+ HashMixWrapper<vectorized::UInt136>>;
+
+using AggregatedDataWithNullableUInt8Key =
vectorized::DataWithNullKey<AggregatedDataWithUInt8Key>;
+using AggregatedDataWithNullableUInt16Key =
+ vectorized::DataWithNullKey<AggregatedDataWithUInt16Key>;
+using AggregatedDataWithNullableUInt32Key =
+ vectorized::DataWithNullKey<AggregatedDataWithUInt32Key>;
+using AggregatedDataWithNullableUInt64Key =
+ vectorized::DataWithNullKey<AggregatedDataWithUInt64Key>;
+using AggregatedDataWithNullableUInt32KeyPhase2 =
+ vectorized::DataWithNullKey<AggregatedDataWithUInt32KeyPhase2>;
+using AggregatedDataWithNullableUInt64KeyPhase2 =
+ vectorized::DataWithNullKey<AggregatedDataWithUInt64KeyPhase2>;
+using AggregatedDataWithNullableShortStringKey =
+ vectorized::DataWithNullKey<AggregatedDataWithShortStringKey>;
+using AggregatedDataWithNullableUInt128Key =
+ vectorized::DataWithNullKey<AggregatedDataWithUInt128Key>;
+using AggregatedDataWithNullableUInt128KeyPhase2 =
+ vectorized::DataWithNullKey<AggregatedDataWithUInt128KeyPhase2>;
+
+using AggregatedMethodVariants = std::variant<
+ std::monostate,
vectorized::MethodSerialized<AggregatedDataWithStringKey>,
+ vectorized::MethodOneNumber<vectorized::UInt8,
AggregatedDataWithUInt8Key>,
+ vectorized::MethodOneNumber<vectorized::UInt16,
AggregatedDataWithUInt16Key>,
+ vectorized::MethodOneNumber<vectorized::UInt32,
AggregatedDataWithUInt32Key>,
+ vectorized::MethodOneNumber<vectorized::UInt64,
AggregatedDataWithUInt64Key>,
+ vectorized::MethodStringNoCache<AggregatedDataWithShortStringKey>,
+ vectorized::MethodOneNumber<vectorized::UInt128,
AggregatedDataWithUInt128Key>,
+ vectorized::MethodOneNumber<vectorized::UInt32,
AggregatedDataWithUInt32KeyPhase2>,
+ vectorized::MethodOneNumber<vectorized::UInt64,
AggregatedDataWithUInt64KeyPhase2>,
+ vectorized::MethodOneNumber<vectorized::UInt128,
AggregatedDataWithUInt128KeyPhase2>,
+ vectorized::MethodSingleNullableColumn<
+ vectorized::MethodOneNumber<vectorized::UInt8,
AggregatedDataWithNullableUInt8Key>>,
+ vectorized::MethodSingleNullableColumn<vectorized::MethodOneNumber<
+ vectorized::UInt16, AggregatedDataWithNullableUInt16Key>>,
+ vectorized::MethodSingleNullableColumn<vectorized::MethodOneNumber<
+ vectorized::UInt32, AggregatedDataWithNullableUInt32Key>>,
+ vectorized::MethodSingleNullableColumn<vectorized::MethodOneNumber<
+ vectorized::UInt64, AggregatedDataWithNullableUInt64Key>>,
+ vectorized::MethodSingleNullableColumn<vectorized::MethodOneNumber<
+ vectorized::UInt32,
AggregatedDataWithNullableUInt32KeyPhase2>>,
+ vectorized::MethodSingleNullableColumn<vectorized::MethodOneNumber<
+ vectorized::UInt64,
AggregatedDataWithNullableUInt64KeyPhase2>>,
+ vectorized::MethodSingleNullableColumn<vectorized::MethodOneNumber<
+ vectorized::UInt128, AggregatedDataWithNullableUInt128Key>>,
+ vectorized::MethodSingleNullableColumn<vectorized::MethodOneNumber<
+ vectorized::UInt128,
AggregatedDataWithNullableUInt128KeyPhase2>>,
+ vectorized::MethodSingleNullableColumn<
+
vectorized::MethodStringNoCache<AggregatedDataWithNullableShortStringKey>>,
+ vectorized::MethodKeysFixed<AggregatedDataWithUInt64Key, false>,
+ vectorized::MethodKeysFixed<AggregatedDataWithUInt64Key, true>,
+ vectorized::MethodKeysFixed<AggregatedDataWithUInt128Key, false>,
+ vectorized::MethodKeysFixed<AggregatedDataWithUInt128Key, true>,
+ vectorized::MethodKeysFixed<AggregatedDataWithUInt256Key, false>,
+ vectorized::MethodKeysFixed<AggregatedDataWithUInt256Key, true>,
+ vectorized::MethodKeysFixed<AggregatedDataWithUInt136Key, false>,
+ vectorized::MethodKeysFixed<AggregatedDataWithUInt136Key, true>,
+ vectorized::MethodKeysFixed<AggregatedDataWithUInt64KeyPhase2, false>,
+ vectorized::MethodKeysFixed<AggregatedDataWithUInt64KeyPhase2, true>,
+ vectorized::MethodKeysFixed<AggregatedDataWithUInt128KeyPhase2, false>,
+ vectorized::MethodKeysFixed<AggregatedDataWithUInt128KeyPhase2, true>,
+ vectorized::MethodKeysFixed<AggregatedDataWithUInt256KeyPhase2, false>,
+ vectorized::MethodKeysFixed<AggregatedDataWithUInt256KeyPhase2, true>,
+ vectorized::MethodKeysFixed<AggregatedDataWithUInt136KeyPhase2, false>,
+ vectorized::MethodKeysFixed<AggregatedDataWithUInt136KeyPhase2, true>>;
+
+struct AggregatedDataVariants
+ : public vectorized::DataVariants<AggregatedMethodVariants,
+
vectorized::MethodSingleNullableColumn,
+ vectorized::MethodOneNumber,
vectorized::MethodKeysFixed,
+ vectorized::DataWithNullKey> {
+ AggregatedDataWithoutKey without_key = nullptr;
+
+ template <bool nullable>
+ void init(Type type) {
+ _type = type;
+ switch (_type) {
+ case Type::without_key:
+ break;
+ case Type::serialized:
+
method_variant.emplace<vectorized::MethodSerialized<AggregatedDataWithStringKey>>();
+ break;
+ case Type::int8_key:
+ emplace_single<vectorized::UInt8, AggregatedDataWithUInt8Key,
nullable>();
+ break;
+ case Type::int16_key:
+ emplace_single<vectorized::UInt16, AggregatedDataWithUInt16Key,
nullable>();
+ break;
+ case Type::int32_key:
+ emplace_single<vectorized::UInt32, AggregatedDataWithUInt32Key,
nullable>();
+ break;
+ case Type::int32_key_phase2:
+ emplace_single<vectorized::UInt32,
AggregatedDataWithUInt32KeyPhase2, nullable>();
+ break;
+ case Type::int64_key:
+ emplace_single<vectorized::UInt64, AggregatedDataWithUInt64Key,
nullable>();
+ break;
+ case Type::int64_key_phase2:
+ emplace_single<vectorized::UInt64,
AggregatedDataWithUInt64KeyPhase2, nullable>();
+ break;
+ case Type::int128_key:
+ emplace_single<vectorized::UInt128, AggregatedDataWithUInt128Key,
nullable>();
+ break;
+ case Type::int128_key_phase2:
+ emplace_single<vectorized::UInt128,
AggregatedDataWithUInt128KeyPhase2, nullable>();
+ break;
+ case Type::string_key:
+ if (nullable) {
+ method_variant.emplace<
+
vectorized::MethodSingleNullableColumn<vectorized::MethodStringNoCache<
+ AggregatedDataWithNullableShortStringKey>>>();
+ } else {
+ method_variant.emplace<
+
vectorized::MethodStringNoCache<AggregatedDataWithShortStringKey>>();
+ }
+ break;
+ default:
+ throw Exception(ErrorCode::INTERNAL_ERROR, "meet invalid key type,
type={}", type);
+ }
+ }
+
+ void init(Type type, bool is_nullable = false) {
+ if (is_nullable) {
+ init<true>(type);
+ } else {
+ init<false>(type);
+ }
+ }
+};
+
+using AggregatedDataVariantsUPtr = std::unique_ptr<AggregatedDataVariants>;
+using ArenaUPtr = std::unique_ptr<vectorized::Arena>;
+
+struct AggregateDataContainer {
+public:
+ AggregateDataContainer(size_t size_of_key, size_t size_of_aggregate_states)
+ : _size_of_key(size_of_key),
_size_of_aggregate_states(size_of_aggregate_states) {}
+
+ int64_t memory_usage() const { return _arena_pool.size(); }
+
+ template <typename KeyType>
+ vectorized::AggregateDataPtr append_data(const KeyType& key) {
+ DCHECK_EQ(sizeof(KeyType), _size_of_key);
+ // SUB_CONTAINER_CAPACITY should add a new sub container, and also
expand when it is zero
+ if (UNLIKELY(_index_in_sub_container % SUB_CONTAINER_CAPACITY == 0)) {
+ _expand();
+ }
+
+ *reinterpret_cast<KeyType*>(_current_keys) = key;
+ auto aggregate_data = _current_agg_data;
+ ++_total_count;
+ ++_index_in_sub_container;
+ _current_agg_data += _size_of_aggregate_states;
+ _current_keys += _size_of_key;
+ return aggregate_data;
+ }
+
+ template <typename Derived, bool IsConst>
+ class IteratorBase {
+ using Container =
+ std::conditional_t<IsConst, const AggregateDataContainer,
AggregateDataContainer>;
+
+ Container* container = nullptr;
+ uint32_t index;
+ uint32_t sub_container_index;
+ uint32_t index_in_sub_container;
+
+ friend class HashTable;
+
+ public:
+ IteratorBase() = default;
+ IteratorBase(Container* container_, uint32_t index_)
+ : container(container_), index(index_) {
+ sub_container_index = index / SUB_CONTAINER_CAPACITY;
+ index_in_sub_container = index - sub_container_index *
SUB_CONTAINER_CAPACITY;
+ }
+
+ bool operator==(const IteratorBase& rhs) const { return index ==
rhs.index; }
+ bool operator!=(const IteratorBase& rhs) const { return index !=
rhs.index; }
+
+ Derived& operator++() {
+ index++;
+ index_in_sub_container++;
+ if (index_in_sub_container == SUB_CONTAINER_CAPACITY) {
+ index_in_sub_container = 0;
+ sub_container_index++;
+ }
+ return static_cast<Derived&>(*this);
+ }
+
+ template <typename KeyType>
+ KeyType get_key() {
+ DCHECK_EQ(sizeof(KeyType), container->_size_of_key);
+ return
((KeyType*)(container->_key_containers[sub_container_index]))
+ [index_in_sub_container];
+ }
+
+ vectorized::AggregateDataPtr get_aggregate_data() {
+ return &(container->_value_containers[sub_container_index]
+
[container->_size_of_aggregate_states *
+ index_in_sub_container]);
+ }
+ };
+
+ class Iterator : public IteratorBase<Iterator, false> {
+ public:
+ using IteratorBase<Iterator, false>::IteratorBase;
+ };
+
+ class ConstIterator : public IteratorBase<ConstIterator, true> {
+ public:
+ using IteratorBase<ConstIterator, true>::IteratorBase;
+ };
+
+ ConstIterator begin() const { return ConstIterator(this, 0); }
+
+ ConstIterator cbegin() const { return begin(); }
+
+ Iterator begin() { return Iterator(this, 0); }
+
+ ConstIterator end() const { return ConstIterator(this, _total_count); }
+ ConstIterator cend() const { return end(); }
+ Iterator end() { return Iterator(this, _total_count); }
+
+ void init_once() {
+ if (_inited) {
+ return;
+ }
+ _inited = true;
+ iterator = begin();
+ }
+ Iterator iterator;
+
+private:
+ void _expand() {
+ _index_in_sub_container = 0;
+ _current_keys = nullptr;
+ _current_agg_data = nullptr;
+ try {
+ _current_keys = _arena_pool.alloc(_size_of_key *
SUB_CONTAINER_CAPACITY);
+ _key_containers.emplace_back(_current_keys);
+
+ _current_agg_data =
(vectorized::AggregateDataPtr)_arena_pool.alloc(
+ _size_of_aggregate_states * SUB_CONTAINER_CAPACITY);
+ _value_containers.emplace_back(_current_agg_data);
+ } catch (...) {
+ if (_current_keys) {
+ _key_containers.pop_back();
+ _current_keys = nullptr;
+ }
+ if (_current_agg_data) {
+ _value_containers.pop_back();
+ _current_agg_data = nullptr;
+ }
+ throw;
+ }
+ }
+
+ static constexpr uint32_t SUB_CONTAINER_CAPACITY = 8192;
+ vectorized::Arena _arena_pool;
+ std::vector<char*> _key_containers;
+ std::vector<vectorized::AggregateDataPtr> _value_containers;
+ vectorized::AggregateDataPtr _current_agg_data = nullptr;
+ char* _current_keys = nullptr;
+ size_t _size_of_key {};
+ size_t _size_of_aggregate_states {};
+ uint32_t _index_in_sub_container {};
+ uint32_t _total_count {};
+ bool _inited = false;
+};
+
+} // namespace pipeline
+
+constexpr auto init_agg_hash_method =
+ init_hash_method<pipeline::AggregatedDataVariants,
vectorized::AggregateDataPtr>;
+
+} // namespace doris
diff --git a/be/src/vec/exec/data_gen_functions/vdata_gen_function_inf.h
b/be/src/pipeline/common/data_gen_functions/vdata_gen_function_inf.h
similarity index 97%
rename from be/src/vec/exec/data_gen_functions/vdata_gen_function_inf.h
rename to be/src/pipeline/common/data_gen_functions/vdata_gen_function_inf.h
index 515be45ad13..bb9ffdb74b7 100644
--- a/be/src/vec/exec/data_gen_functions/vdata_gen_function_inf.h
+++ b/be/src/pipeline/common/data_gen_functions/vdata_gen_function_inf.h
@@ -29,7 +29,7 @@ class RuntimeState;
class Status;
class TScanRangeParams;
-namespace vectorized {
+namespace pipeline {
class VDataGenFunctionInf {
public:
@@ -51,6 +51,6 @@ protected:
const TupleDescriptor* _tuple_desc = nullptr;
};
-} // namespace vectorized
+} // namespace pipeline
} // namespace doris
diff --git a/be/src/vec/exec/data_gen_functions/vnumbers_tvf.cpp
b/be/src/pipeline/common/data_gen_functions/vnumbers_tvf.cpp
similarity index 87%
rename from be/src/vec/exec/data_gen_functions/vnumbers_tvf.cpp
rename to be/src/pipeline/common/data_gen_functions/vnumbers_tvf.cpp
index d33d02aa953..3bbcb544abc 100644
--- a/be/src/vec/exec/data_gen_functions/vnumbers_tvf.cpp
+++ b/be/src/pipeline/common/data_gen_functions/vnumbers_tvf.cpp
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#include "vec/exec/data_gen_functions/vnumbers_tvf.h"
+#include "pipeline/common/data_gen_functions/vnumbers_tvf.h"
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/PlanNodes_types.h>
@@ -35,7 +35,7 @@
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
-namespace doris::vectorized {
+namespace doris::pipeline {
VNumbersTVF::VNumbersTVF(TupleId tuple_id, const TupleDescriptor* tuple_desc)
: VDataGenFunctionInf(tuple_id, tuple_desc) {}
@@ -59,7 +59,7 @@ Status VNumbersTVF::get_next(RuntimeState* state,
vectorized::Block* block, bool
*eos = true;
continue;
}
- auto* column_res = assert_cast<ColumnInt64*>(columns[i].get());
//BIGINT
+ auto* column_res =
assert_cast<vectorized::ColumnInt64*>(columns[i].get()); //BIGINT
int64_t end_value = std::min((int64_t)(_next_number + batch_size),
_total_numbers);
if (_use_const) {
column_res->insert_many_vals(_const_value, end_value -
_next_number);
@@ -78,9 +78,9 @@ Status VNumbersTVF::get_next(RuntimeState* state,
vectorized::Block* block, bool
} else {
size_t n_columns = 0;
for (const auto* slot_desc : _tuple_desc->slots()) {
-
block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]),
- slot_desc->get_data_type_ptr(),
- slot_desc->col_name()));
+
block->insert(vectorized::ColumnWithTypeAndName(std::move(columns[n_columns++]),
+
slot_desc->get_data_type_ptr(),
+
slot_desc->col_name()));
}
}
return Status::OK();
@@ -97,4 +97,4 @@ Status VNumbersTVF::set_scan_ranges(const
std::vector<TScanRangeParams>& scan_ra
return Status::OK();
}
-} // namespace doris::vectorized
+} // namespace doris::pipeline
diff --git a/be/src/vec/exec/data_gen_functions/vnumbers_tvf.h
b/be/src/pipeline/common/data_gen_functions/vnumbers_tvf.h
similarity index 93%
rename from be/src/vec/exec/data_gen_functions/vnumbers_tvf.h
rename to be/src/pipeline/common/data_gen_functions/vnumbers_tvf.h
index 1968637fd36..bf8b117a378 100644
--- a/be/src/vec/exec/data_gen_functions/vnumbers_tvf.h
+++ b/be/src/pipeline/common/data_gen_functions/vnumbers_tvf.h
@@ -21,7 +21,7 @@
#include <vector>
#include "common/global_types.h"
-#include "vec/exec/data_gen_functions/vdata_gen_function_inf.h"
+#include "pipeline/common/data_gen_functions/vdata_gen_function_inf.h"
namespace doris {
@@ -30,7 +30,7 @@ class RuntimeState;
class Status;
class TScanRangeParams;
-namespace vectorized {
+namespace pipeline {
class Block;
class VNumbersTVF : public VDataGenFunctionInf {
@@ -51,6 +51,6 @@ private:
int64_t _next_number = 0;
};
-} // namespace vectorized
+} // namespace pipeline
} // namespace doris
diff --git a/be/src/pipeline/common/join_utils.h
b/be/src/pipeline/common/join_utils.h
new file mode 100644
index 00000000000..cd3374995f7
--- /dev/null
+++ b/be/src/pipeline/common/join_utils.h
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <variant>
+#include <vector>
+
+#include "vec/common/hash_table/hash_map_context_creator.h"
+#include "vec/common/hash_table/hash_map_util.h"
+
+namespace doris::pipeline {
+using JoinOpVariants =
+ std::variant<std::integral_constant<TJoinOp::type,
TJoinOp::INNER_JOIN>,
+ std::integral_constant<TJoinOp::type,
TJoinOp::LEFT_SEMI_JOIN>,
+ std::integral_constant<TJoinOp::type,
TJoinOp::LEFT_ANTI_JOIN>,
+ std::integral_constant<TJoinOp::type,
TJoinOp::LEFT_OUTER_JOIN>,
+ std::integral_constant<TJoinOp::type,
TJoinOp::FULL_OUTER_JOIN>,
+ std::integral_constant<TJoinOp::type,
TJoinOp::RIGHT_OUTER_JOIN>,
+ std::integral_constant<TJoinOp::type,
TJoinOp::CROSS_JOIN>,
+ std::integral_constant<TJoinOp::type,
TJoinOp::RIGHT_SEMI_JOIN>,
+ std::integral_constant<TJoinOp::type,
TJoinOp::RIGHT_ANTI_JOIN>,
+ std::integral_constant<TJoinOp::type,
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>,
+ std::integral_constant<TJoinOp::type,
TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN>>;
+
+using I8HashTableContext =
vectorized::PrimaryTypeHashTableContext<vectorized::UInt8>;
+using I16HashTableContext =
vectorized::PrimaryTypeHashTableContext<vectorized::UInt16>;
+using I32HashTableContext =
vectorized::PrimaryTypeHashTableContext<vectorized::UInt32>;
+using I64HashTableContext = vectorized::PrimaryTypeHashTableContext<UInt64>;
+using I128HashTableContext = vectorized::PrimaryTypeHashTableContext<UInt128>;
+using I256HashTableContext = vectorized::PrimaryTypeHashTableContext<UInt256>;
+
+template <bool has_null>
+using I64FixedKeyHashTableContext =
vectorized::FixedKeyHashTableContext<UInt64, has_null>;
+
+template <bool has_null>
+using I128FixedKeyHashTableContext =
vectorized::FixedKeyHashTableContext<UInt128, has_null>;
+
+template <bool has_null>
+using I256FixedKeyHashTableContext =
vectorized::FixedKeyHashTableContext<UInt256, has_null>;
+
+template <bool has_null>
+using I136FixedKeyHashTableContext =
vectorized::FixedKeyHashTableContext<UInt136, has_null>;
+
+using HashTableVariants =
+ std::variant<std::monostate, vectorized::SerializedHashTableContext,
I8HashTableContext,
+ I16HashTableContext, I32HashTableContext,
I64HashTableContext,
+ I128HashTableContext, I256HashTableContext,
I64FixedKeyHashTableContext<true>,
+ I64FixedKeyHashTableContext<false>,
I128FixedKeyHashTableContext<true>,
+ I128FixedKeyHashTableContext<false>,
I256FixedKeyHashTableContext<true>,
+ I256FixedKeyHashTableContext<false>,
I136FixedKeyHashTableContext<true>,
+ I136FixedKeyHashTableContext<false>>;
+
+} // namespace doris::pipeline
diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp
b/be/src/pipeline/common/runtime_filter_consumer.cpp
similarity index 96%
rename from be/src/vec/exec/runtime_filter_consumer.cpp
rename to be/src/pipeline/common/runtime_filter_consumer.cpp
index 8f3a0e9fac1..0e9c2d0f304 100644
--- a/be/src/vec/exec/runtime_filter_consumer.cpp
+++ b/be/src/pipeline/common/runtime_filter_consumer.cpp
@@ -15,16 +15,16 @@
// specific language governing permissions and limitations
// under the License.
-#include "vec/exec/runtime_filter_consumer.h"
+#include "pipeline/common/runtime_filter_consumer.h"
#include "pipeline/pipeline_task.h"
-namespace doris::vectorized {
+namespace doris::pipeline {
RuntimeFilterConsumer::RuntimeFilterConsumer(const int32_t filter_id,
const
std::vector<TRuntimeFilterDesc>& runtime_filters,
const RowDescriptor&
row_descriptor,
- VExprContextSPtrs& conjuncts)
+ vectorized::VExprContextSPtrs&
conjuncts)
: _filter_id(filter_id),
_runtime_filter_descs(runtime_filters),
_row_descriptor_ref(row_descriptor),
@@ -148,7 +148,7 @@ Status RuntimeFilterConsumer::_append_rf_into_conjuncts(
}
for (const auto& expr : vexprs) {
- VExprContextSPtr conjunct = VExprContext::create_shared(expr);
+ vectorized::VExprContextSPtr conjunct =
vectorized::VExprContext::create_shared(expr);
RETURN_IF_ERROR(conjunct->prepare(_state, _row_descriptor_ref));
RETURN_IF_ERROR(conjunct->open(_state));
_rf_vexpr_set.insert(expr);
@@ -202,4 +202,4 @@ void
RuntimeFilterConsumer::_prepare_rf_timer(RuntimeProfile* profile) {
_acquire_runtime_filter_timer = ADD_TIMER(profile,
"AcquireRuntimeFilterTime");
}
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/vec/exec/runtime_filter_consumer.h
b/be/src/pipeline/common/runtime_filter_consumer.h
similarity index 92%
rename from be/src/vec/exec/runtime_filter_consumer.h
rename to be/src/pipeline/common/runtime_filter_consumer.h
index 7f06edcbf09..9bee6053f6f 100644
--- a/be/src/vec/exec/runtime_filter_consumer.h
+++ b/be/src/pipeline/common/runtime_filter_consumer.h
@@ -20,13 +20,14 @@
#include "exprs/runtime_filter.h"
#include "pipeline/dependency.h"
-namespace doris::vectorized {
+namespace doris::pipeline {
class RuntimeFilterConsumer {
public:
RuntimeFilterConsumer(const int32_t filter_id,
const std::vector<TRuntimeFilterDesc>&
runtime_filters,
- const RowDescriptor& row_descriptor,
VExprContextSPtrs& conjuncts);
+ const RowDescriptor& row_descriptor,
+ vectorized::VExprContextSPtrs& conjuncts);
~RuntimeFilterConsumer() = default;
Status init(RuntimeState* state, bool need_local_merge = false);
@@ -65,7 +66,7 @@ protected:
// Set to true if the runtime filter is ready.
std::vector<bool> _runtime_filter_ready_flag;
std::mutex _rf_locks;
- phmap::flat_hash_set<VExprSPtr> _rf_vexpr_set;
+ phmap::flat_hash_set<vectorized::VExprSPtr> _rf_vexpr_set;
RuntimeState* _state = nullptr;
private:
@@ -85,4 +86,4 @@ private:
RuntimeProfile::Counter* _acquire_runtime_filter_timer = nullptr;
};
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index 5ce5e0a56a3..68c00af409d 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -21,11 +21,14 @@
#include <mutex>
#include "common/logging.h"
+#include "exprs/runtime_filter.h"
#include "pipeline/exec/multi_cast_data_streamer.h"
#include "pipeline/pipeline_fragment_context.h"
#include "pipeline/pipeline_task.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
+#include "vec/exprs/vectorized_agg_fn.h"
+#include "vec/exprs/vslot_ref.h"
#include "vec/spill/spill_stream_manager.h"
namespace doris::pipeline {
@@ -297,7 +300,7 @@ Status AggSharedState::reset_hash_table() {
RETURN_IF_ERROR(st);
}
- aggregate_data_container.reset(new
vectorized::AggregateDataContainer(
+ aggregate_data_container.reset(new
AggregateDataContainer(
sizeof(typename HashTableType::key_type),
((total_size_of_aggregate_states +
align_aggregate_states - 1) /
align_aggregate_states) *
@@ -376,4 +379,19 @@ MultiCastSharedState::MultiCastSharedState(const
RowDescriptor& row_desc, Object
:
multi_cast_data_streamer(std::make_unique<pipeline::MultiCastDataStreamer>(
row_desc, pool, cast_sender_count, true)) {}
+int AggSharedState::get_slot_column_id(const vectorized::AggFnEvaluator*
evaluator) {
+ auto ctxs = evaluator->input_exprs_ctxs();
+ CHECK(ctxs.size() == 1 && ctxs[0]->root()->is_slot_ref())
+ << "input_exprs_ctxs is invalid, input_exprs_ctx[0]="
+ << ctxs[0]->root()->debug_string();
+ return ((vectorized::VSlotRef*)ctxs[0]->root().get())->column_id();
+}
+
+Status AggSharedState::_destroy_agg_status(vectorized::AggregateDataPtr data) {
+ for (int i = 0; i < aggregate_evaluators.size(); ++i) {
+ aggregate_evaluators[i]->function()->destroy(data +
offsets_of_aggregate_states[i]);
+ }
+ return Status::OK();
+}
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index b7b4258f53a..46335caade5 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -29,16 +29,20 @@
#include "common/logging.h"
#include "concurrentqueue.h"
#include "gutil/integral_types.h"
+#include "pipeline/common/agg_utils.h"
+#include "pipeline/common/join_utils.h"
#include "pipeline/exec/data_queue.h"
-#include "vec/common/hash_table/hash_map_context_creator.h"
+#include "pipeline/exec/join/process_hash_table_probe.h"
#include "vec/common/sort/partition_sorter.h"
#include "vec/common/sort/sorter.h"
#include "vec/core/types.h"
-#include "vec/exec/join/process_hash_table_probe.h"
-#include "vec/exec/join/vhash_join_node.h"
-#include "vec/exec/vaggregation_node.h"
#include "vec/spill/spill_stream.h"
+namespace doris::vectorized {
+class AggFnEvaluator;
+class VSlotRef;
+} // namespace doris::vectorized
+
namespace doris::pipeline {
class Dependency;
@@ -294,7 +298,7 @@ struct AggSharedState : public BasicSharedState {
ENABLE_FACTORY_CREATOR(AggSharedState)
public:
AggSharedState() {
- agg_data = std::make_unique<vectorized::AggregatedDataVariants>();
+ agg_data = std::make_unique<AggregatedDataVariants>();
agg_arena_pool = std::make_unique<vectorized::Arena>();
}
~AggSharedState() override {
@@ -313,17 +317,11 @@ public:
// We should call this function only at 1st phase.
// 1st phase: is_merge=true, only have one SlotRef.
// 2nd phase: is_merge=false, maybe have multiple exprs.
- static int get_slot_column_id(const vectorized::AggFnEvaluator* evaluator)
{
- auto ctxs = evaluator->input_exprs_ctxs();
- CHECK(ctxs.size() == 1 && ctxs[0]->root()->is_slot_ref())
- << "input_exprs_ctxs is invalid, input_exprs_ctx[0]="
- << ctxs[0]->root()->debug_string();
- return ((vectorized::VSlotRef*)ctxs[0]->root().get())->column_id();
- }
+ static int get_slot_column_id(const vectorized::AggFnEvaluator* evaluator);
- vectorized::AggregatedDataVariantsUPtr agg_data = nullptr;
- std::unique_ptr<vectorized::AggregateDataContainer>
aggregate_data_container;
- vectorized::ArenaUPtr agg_arena_pool;
+ AggregatedDataVariantsUPtr agg_data = nullptr;
+ std::unique_ptr<AggregateDataContainer> aggregate_data_container;
+ ArenaUPtr agg_arena_pool;
std::vector<vectorized::AggFnEvaluator*> aggregate_evaluators;
// group by k1,k2
vectorized::VExprContextSPtrs probe_expr_ctxs;
@@ -445,12 +443,7 @@ private:
agg_data_created_without_key = false;
}
}
- Status _destroy_agg_status(vectorized::AggregateDataPtr data) {
- for (int i = 0; i < aggregate_evaluators.size(); ++i) {
- aggregate_evaluators[i]->function()->destroy(data +
offsets_of_aggregate_states[i]);
- }
- return Status::OK();
- }
+ Status _destroy_agg_status(vectorized::AggregateDataPtr data);
};
struct AggSpillPartition;
@@ -613,19 +606,6 @@ public:
std::vector<int64_t> ordey_by_column_idxs;
};
-using JoinOpVariants =
- std::variant<std::integral_constant<TJoinOp::type,
TJoinOp::INNER_JOIN>,
- std::integral_constant<TJoinOp::type,
TJoinOp::LEFT_SEMI_JOIN>,
- std::integral_constant<TJoinOp::type,
TJoinOp::LEFT_ANTI_JOIN>,
- std::integral_constant<TJoinOp::type,
TJoinOp::LEFT_OUTER_JOIN>,
- std::integral_constant<TJoinOp::type,
TJoinOp::FULL_OUTER_JOIN>,
- std::integral_constant<TJoinOp::type,
TJoinOp::RIGHT_OUTER_JOIN>,
- std::integral_constant<TJoinOp::type,
TJoinOp::CROSS_JOIN>,
- std::integral_constant<TJoinOp::type,
TJoinOp::RIGHT_SEMI_JOIN>,
- std::integral_constant<TJoinOp::type,
TJoinOp::RIGHT_ANTI_JOIN>,
- std::integral_constant<TJoinOp::type,
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>,
- std::integral_constant<TJoinOp::type,
TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN>>;
-
struct JoinSharedState : public BasicSharedState {
// For some join case, we can apply a short circuit strategy
// 1. _has_null_in_build_side = true
@@ -646,8 +626,7 @@ struct HashJoinSharedState : public JoinSharedState {
std::shared_ptr<vectorized::Arena> arena =
std::make_shared<vectorized::Arena>();
// maybe share hash table with other fragment instances
- std::shared_ptr<vectorized::HashTableVariants> hash_table_variants =
- std::make_shared<vectorized::HashTableVariants>();
+ std::shared_ptr<HashTableVariants> hash_table_variants =
std::make_shared<HashTableVariants>();
const std::vector<TupleDescriptor*> build_side_child_desc;
size_t build_exprs_size = 0;
std::shared_ptr<vectorized::Block> build_block;
@@ -696,23 +675,23 @@ public:
~AsyncWriterDependency() override = default;
};
-using SetHashTableVariants = std::variant<
- std::monostate,
- vectorized::MethodSerialized<HashMap<StringRef,
vectorized::RowRefListWithFlags>>,
- vectorized::SetPrimaryTypeHashTableContext<vectorized::UInt8>,
- vectorized::SetPrimaryTypeHashTableContext<vectorized::UInt16>,
- vectorized::SetPrimaryTypeHashTableContext<vectorized::UInt32>,
- vectorized::SetPrimaryTypeHashTableContext<UInt64>,
- vectorized::SetPrimaryTypeHashTableContext<UInt128>,
- vectorized::SetPrimaryTypeHashTableContext<UInt256>,
- vectorized::SetFixedKeyHashTableContext<UInt64, true>,
- vectorized::SetFixedKeyHashTableContext<UInt64, false>,
- vectorized::SetFixedKeyHashTableContext<UInt128, true>,
- vectorized::SetFixedKeyHashTableContext<UInt128, false>,
- vectorized::SetFixedKeyHashTableContext<UInt256, true>,
- vectorized::SetFixedKeyHashTableContext<UInt256, false>,
- vectorized::SetFixedKeyHashTableContext<UInt136, true>,
- vectorized::SetFixedKeyHashTableContext<UInt136, false>>;
+using SetHashTableVariants =
+ std::variant<std::monostate,
+ vectorized::MethodSerialized<HashMap<StringRef,
RowRefListWithFlags>>,
+
vectorized::SetPrimaryTypeHashTableContext<vectorized::UInt8>,
+
vectorized::SetPrimaryTypeHashTableContext<vectorized::UInt16>,
+
vectorized::SetPrimaryTypeHashTableContext<vectorized::UInt32>,
+ vectorized::SetPrimaryTypeHashTableContext<UInt64>,
+ vectorized::SetPrimaryTypeHashTableContext<UInt128>,
+ vectorized::SetPrimaryTypeHashTableContext<UInt256>,
+ vectorized::SetFixedKeyHashTableContext<UInt64, true>,
+ vectorized::SetFixedKeyHashTableContext<UInt64, false>,
+ vectorized::SetFixedKeyHashTableContext<UInt128, true>,
+ vectorized::SetFixedKeyHashTableContext<UInt128, false>,
+ vectorized::SetFixedKeyHashTableContext<UInt256, true>,
+ vectorized::SetFixedKeyHashTableContext<UInt256, false>,
+ vectorized::SetFixedKeyHashTableContext<UInt136, true>,
+ vectorized::SetFixedKeyHashTableContext<UInt136, false>>;
struct SetSharedState : public BasicSharedState {
ENABLE_FACTORY_CREATOR(SetSharedState)
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 9870571fa5e..41498fd94fa 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -24,6 +24,7 @@
#include "pipeline/exec/operator.h"
#include "runtime/primitive_type.h"
#include "vec/common/hash_table/hash.h"
+#include "vec/exprs/vectorized_agg_fn.h"
namespace doris::pipeline {
@@ -110,25 +111,24 @@ Status AggSinkLocalState::open(RuntimeState* state) {
} else {
RETURN_IF_ERROR(_init_hash_method(Base::_shared_state->probe_expr_ctxs));
- std::visit(vectorized::Overload {
- [&](std::monostate& arg) {
- throw
doris::Exception(ErrorCode::INTERNAL_ERROR,
- "uninited hash table");
- },
- [&](auto& agg_method) {
- using HashTableType =
std::decay_t<decltype(agg_method)>;
- using KeyType = typename HashTableType::Key;
-
- /// some aggregate functions (like AVG for
decimal) have align issues.
- Base::_shared_state->aggregate_data_container =
-
std::make_unique<vectorized::AggregateDataContainer>(
-
- sizeof(KeyType),
-
((p._total_size_of_aggregate_states +
- p._align_aggregate_states -
1) /
- p._align_aggregate_states) *
-
p._align_aggregate_states);
- }},
+ std::visit(vectorized::Overload {[&](std::monostate& arg) {
+ throw
doris::Exception(ErrorCode::INTERNAL_ERROR,
+ "uninited
hash table");
+ },
+ [&](auto& agg_method) {
+ using HashTableType =
+
std::decay_t<decltype(agg_method)>;
+ using KeyType = typename
HashTableType::Key;
+
+ /// some aggregate functions
(like AVG for decimal) have align issues.
+
Base::_shared_state->aggregate_data_container =
+
std::make_unique<AggregateDataContainer>(
+ sizeof(KeyType),
+
((p._total_size_of_aggregate_states +
+
p._align_aggregate_states - 1) /
+
p._align_aggregate_states) *
+
p._align_aggregate_states);
+ }},
_agg_data->method_variant);
if (p._is_merge) {
_executor = std::make_unique<Executor<false, true>>();
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 2964cba9a1d..add6712453f 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -119,7 +119,7 @@ protected:
vectorized::Block _preagg_block = vectorized::Block();
- vectorized::AggregatedDataVariants* _agg_data = nullptr;
+ AggregatedDataVariants* _agg_data = nullptr;
vectorized::Arena* _agg_arena_pool = nullptr;
std::unique_ptr<ExecutorBase> _executor = nullptr;
@@ -156,7 +156,7 @@ public:
bool require_data_distribution() const override { return _is_colocate; }
size_t get_revocable_mem_size(RuntimeState* state) const;
- vectorized::AggregatedDataVariants* get_agg_data(RuntimeState* state) {
+ AggregatedDataVariants* get_agg_data(RuntimeState* state) {
auto& local_state = get_local_state(state);
return local_state._agg_data;
}
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index cca9fefbdb2..5b371877f36 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -22,6 +22,7 @@
#include "common/exception.h"
#include "pipeline/exec/operator.h"
+#include "vec/exprs/vectorized_agg_fn.h"
namespace doris::pipeline {
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index 5358cffcff5..43859c7cebd 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -21,6 +21,7 @@
#include <string>
#include "pipeline/exec/operator.h"
+#include "vec/exprs/vectorized_agg_fn.h"
namespace doris::pipeline {
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp
b/be/src/pipeline/exec/analytic_source_operator.cpp
index 6d30d02ff53..bc8f3279f92 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -21,6 +21,7 @@
#include "pipeline/exec/operator.h"
#include "vec/columns/column_nullable.h"
+#include "vec/exprs/vectorized_agg_fn.h"
namespace doris::pipeline {
diff --git a/be/src/pipeline/exec/datagen_operator.cpp
b/be/src/pipeline/exec/datagen_operator.cpp
index 95b284c94b4..39e35ed8836 100644
--- a/be/src/pipeline/exec/datagen_operator.cpp
+++ b/be/src/pipeline/exec/datagen_operator.cpp
@@ -19,10 +19,11 @@
#include <memory>
+#include "exprs/runtime_filter.h"
+#include "pipeline/common/data_gen_functions/vdata_gen_function_inf.h"
+#include "pipeline/common/data_gen_functions/vnumbers_tvf.h"
#include "pipeline/exec/operator.h"
#include "util/runtime_profile.h"
-#include "vec/exec/data_gen_functions/vdata_gen_function_inf.h"
-#include "vec/exec/data_gen_functions/vnumbers_tvf.h"
namespace doris {
class RuntimeState;
@@ -76,7 +77,7 @@ Status DataGenSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block*
Status DataGenLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
auto& p = _parent->cast<DataGenSourceOperatorX>();
- _table_func = std::make_shared<vectorized::VNumbersTVF>(p._tuple_id,
p._tuple_desc);
+ _table_func = std::make_shared<VNumbersTVF>(p._tuple_id, p._tuple_desc);
_table_func->set_tuple_desc(p._tuple_desc);
RETURN_IF_ERROR(_table_func->set_scan_ranges(info.scan_ranges));
diff --git a/be/src/pipeline/exec/datagen_operator.h
b/be/src/pipeline/exec/datagen_operator.h
index d4d649a6c06..8aeeea2a699 100644
--- a/be/src/pipeline/exec/datagen_operator.h
+++ b/be/src/pipeline/exec/datagen_operator.h
@@ -20,8 +20,8 @@
#include <stdint.h>
#include "common/status.h"
+#include "pipeline/common/data_gen_functions/vdata_gen_function_inf.h"
#include "pipeline/exec/operator.h"
-#include "vec/exec/data_gen_functions/vdata_gen_function_inf.h"
namespace doris {
class RuntimeState;
@@ -43,7 +43,7 @@ public:
private:
friend class DataGenSourceOperatorX;
- std::shared_ptr<vectorized::VDataGenFunctionInf> _table_func;
+ std::shared_ptr<VDataGenFunctionInf> _table_func;
};
class DataGenSourceOperatorX final : public OperatorX<DataGenLocalState> {
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index f0cb738a303..73ce8ce5fb4 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -23,6 +23,7 @@
#include <utility>
#include "common/compiler_util.h" // IWYU pragma: keep
+#include "vec/exprs/vectorized_agg_fn.h"
namespace doris {
class ExecNode;
@@ -61,7 +62,7 @@
DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* sta
dummy_mapped_data(std::make_shared<char>('A')),
batch_size(state->batch_size()),
_agg_arena_pool(std::make_unique<vectorized::Arena>()),
- _agg_data(std::make_unique<vectorized::AggregatedDataVariants>()),
+ _agg_data(std::make_unique<AggregatedDataVariants>()),
_agg_profile_arena(std::make_unique<vectorized::Arena>()),
_child_block(vectorized::Block::create_unique()),
_aggregated_block(vectorized::Block::create_unique()) {}
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index e8166446678..d6ff5fde0c5 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -73,7 +73,7 @@ private:
bool _stop_emplace_flag = false;
const int batch_size;
std::unique_ptr<vectorized::Arena> _agg_arena_pool = nullptr;
- vectorized::AggregatedDataVariantsUPtr _agg_data = nullptr;
+ AggregatedDataVariantsUPtr _agg_data = nullptr;
std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators;
// group by k1,k2
vectorized::VExprContextSPtrs _probe_expr_ctxs;
diff --git a/be/src/pipeline/exec/es_scan_operator.cpp
b/be/src/pipeline/exec/es_scan_operator.cpp
index aab16ff3bff..9fcfdd999d4 100644
--- a/be/src/pipeline/exec/es_scan_operator.cpp
+++ b/be/src/pipeline/exec/es_scan_operator.cpp
@@ -81,12 +81,11 @@ Status
EsScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* sca
}
properties[ESScanReader::KEY_SHARD] =
std::to_string(es_scan_range->shard_id);
properties[ESScanReader::KEY_BATCH_SIZE] =
-
std::to_string(vectorized::RuntimeFilterConsumer::_state->batch_size());
+ std::to_string(RuntimeFilterConsumer::_state->batch_size());
properties[ESScanReader::KEY_HOST_PORT] =
get_host_and_port(es_scan_range->es_hosts);
// push down limit to Elasticsearch
// if predicate in _conjunct_ctxs can not be processed by
Elasticsearch, we can not push down limit operator to Elasticsearch
- if (p.limit() != -1 &&
- p.limit() <=
vectorized::RuntimeFilterConsumer::_state->batch_size()) {
+ if (p.limit() != -1 && p.limit() <=
RuntimeFilterConsumer::_state->batch_size()) {
properties[ESScanReader::KEY_TERMINATE_AFTER] =
std::to_string(p.limit());
}
@@ -95,12 +94,11 @@ Status
EsScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* sca
properties, p._column_names, p._docvalue_context,
&doc_value_mode);
std::shared_ptr<vectorized::NewEsScanner> scanner =
vectorized::NewEsScanner::create_shared(
- vectorized::RuntimeFilterConsumer::_state, this,
p._limit_per_scanner, p._tuple_id,
- properties, p._docvalue_context, doc_value_mode,
- vectorized::RuntimeFilterConsumer::_state->runtime_profile());
+ RuntimeFilterConsumer::_state, this, p._limit_per_scanner,
p._tuple_id, properties,
+ p._docvalue_context, doc_value_mode,
+ RuntimeFilterConsumer::_state->runtime_profile());
- RETURN_IF_ERROR(
- scanner->prepare(vectorized::RuntimeFilterConsumer::_state,
Base::_conjuncts));
+ RETURN_IF_ERROR(scanner->prepare(RuntimeFilterConsumer::_state,
Base::_conjuncts));
scanners->push_back(scanner);
}
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index c77761b01bf..3ea10110084 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -289,10 +289,8 @@ Status
HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
auto with_other_conjuncts) -> Status {
using HashTableCtxType = std::decay_t<decltype(arg)>;
using JoinOpType = std::decay_t<decltype(join_op)>;
- vectorized::ProcessHashTableBuild<HashTableCtxType,
-
HashJoinBuildSinkLocalState>
- hash_table_build_process(rows, raw_ptrs, this,
state->batch_size(),
- state);
+ ProcessHashTableBuild<HashTableCtxType>
hash_table_build_process(
+ rows, raw_ptrs, this, state->batch_size(),
state);
auto old_hash_table_size =
arg.hash_table->get_byte_size();
auto old_key_size = arg.serialized_keys_size(true);
auto st = hash_table_build_process.template run<
@@ -338,26 +336,22 @@ void
HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) {
switch (_build_expr_ctxs[0]->root()->result_type()) {
case TYPE_BOOLEAN:
case TYPE_TINYINT:
- _shared_state->hash_table_variants
- ->emplace<vectorized::I8HashTableContext>();
+
_shared_state->hash_table_variants->emplace<I8HashTableContext>();
break;
case TYPE_SMALLINT:
- _shared_state->hash_table_variants
- ->emplace<vectorized::I16HashTableContext>();
+
_shared_state->hash_table_variants->emplace<I16HashTableContext>();
break;
case TYPE_INT:
case TYPE_FLOAT:
case TYPE_DATEV2:
- _shared_state->hash_table_variants
- ->emplace<vectorized::I32HashTableContext>();
+
_shared_state->hash_table_variants->emplace<I32HashTableContext>();
break;
case TYPE_BIGINT:
case TYPE_DOUBLE:
case TYPE_DATETIME:
case TYPE_DATE:
case TYPE_DATETIMEV2:
- _shared_state->hash_table_variants
- ->emplace<vectorized::I64HashTableContext>();
+
_shared_state->hash_table_variants->emplace<I64HashTableContext>();
break;
case TYPE_LARGEINT:
case TYPE_DECIMALV2:
@@ -375,14 +369,11 @@ void
HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) {
: type_ptr->get_type_id();
vectorized::WhichDataType which(idx);
if (which.is_decimal32()) {
- _shared_state->hash_table_variants
-
->emplace<vectorized::I32HashTableContext>();
+
_shared_state->hash_table_variants->emplace<I32HashTableContext>();
} else if (which.is_decimal64()) {
- _shared_state->hash_table_variants
-
->emplace<vectorized::I64HashTableContext>();
+
_shared_state->hash_table_variants->emplace<I64HashTableContext>();
} else {
- _shared_state->hash_table_variants
-
->emplace<vectorized::I128HashTableContext>();
+
_shared_state->hash_table_variants->emplace<I128HashTableContext>();
}
break;
}
@@ -606,7 +597,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
}
},
*local_state._shared_state->hash_table_variants,
- *std::static_pointer_cast<vectorized::HashTableVariants>(
+ *std::static_pointer_cast<HashTableVariants>(
_shared_hash_table_context->hash_table_variants));
local_state._shared_state->build_block =
_shared_hash_table_context->block;
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index fb66c5222ab..d785c20ee7f 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -17,8 +17,7 @@
#pragma once
-#include <stdint.h>
-
+#include "exprs/runtime_filter_slots.h"
#include "join_build_sink_operator.h"
#include "operator.h"
@@ -67,8 +66,8 @@ protected:
const std::vector<int>& res_col_ids);
friend class HashJoinBuildSinkOperatorX;
friend class PartitionedHashJoinSinkLocalState;
- template <class HashTableContext, typename Parent>
- friend struct vectorized::ProcessHashTableBuild;
+ template <class HashTableContext>
+ friend struct ProcessHashTableBuild;
// build expr
vectorized::VExprContextSPtrs _build_expr_ctxs;
@@ -176,4 +175,57 @@ private:
const bool _need_local_merge;
};
+template <class HashTableContext>
+struct ProcessHashTableBuild {
+ ProcessHashTableBuild(int rows, vectorized::ColumnRawPtrs& build_raw_ptrs,
+ HashJoinBuildSinkLocalState* parent, int batch_size,
RuntimeState* state)
+ : _rows(rows),
+ _build_raw_ptrs(build_raw_ptrs),
+ _parent(parent),
+ _batch_size(batch_size),
+ _state(state) {}
+
+ template <int JoinOpType, bool ignore_null, bool short_circuit_for_null,
+ bool with_other_conjuncts>
+ Status run(HashTableContext& hash_table_ctx, vectorized::ConstNullMapPtr
null_map,
+ bool* has_null_key) {
+ if (short_circuit_for_null || ignore_null) {
+ // first row is mocked and is null
+ for (uint32_t i = 1; i < _rows; i++) {
+ if ((*null_map)[i]) {
+ *has_null_key = true;
+ }
+ }
+ if (short_circuit_for_null && *has_null_key) {
+ return Status::OK();
+ }
+ }
+
+ SCOPED_TIMER(_parent->_build_table_insert_timer);
+ hash_table_ctx.hash_table->template prepare_build<JoinOpType>(_rows,
_batch_size,
+
*has_null_key);
+
+ hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows,
+ null_map ? null_map->data() :
nullptr, true, true,
+
hash_table_ctx.hash_table->get_bucket_size());
+ hash_table_ctx.hash_table->template build<JoinOpType,
with_other_conjuncts>(
+ hash_table_ctx.keys, hash_table_ctx.bucket_nums.data(), _rows);
+ hash_table_ctx.bucket_nums.resize(_batch_size);
+ hash_table_ctx.bucket_nums.shrink_to_fit();
+
+ COUNTER_SET(_parent->_hash_table_memory_usage,
+ (int64_t)hash_table_ctx.hash_table->get_byte_size());
+ COUNTER_SET(_parent->_build_arena_memory_usage,
+ (int64_t)hash_table_ctx.serialized_keys_size(true));
+ return Status::OK();
+ }
+
+private:
+ const uint32_t _rows;
+ vectorized::ColumnRawPtrs& _build_raw_ptrs;
+ HashJoinBuildSinkLocalState* _parent = nullptr;
+ int _batch_size;
+ RuntimeState* _state = nullptr;
+};
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 56630efba3a..002ead551f6 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -73,9 +73,8 @@ Status HashJoinProbeLocalState::open(RuntimeState* state) {
std::visit(
[&](auto&& join_op_variants, auto have_other_join_conjunct) {
using JoinOpType = std::decay_t<decltype(join_op_variants)>;
- _process_hashtable_ctx_variants
-
->emplace<vectorized::ProcessHashTableProbe<JoinOpType::value>>(
- this, state->batch_size());
+
_process_hashtable_ctx_variants->emplace<ProcessHashTableProbe<JoinOpType::value>>(
+ this, state->batch_size());
},
_shared_state->join_op_variants,
vectorized::make_bool_variant(p._have_other_join_conjunct));
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 310cf52fec6..028b0583167 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -30,17 +30,17 @@ namespace pipeline {
class HashJoinProbeLocalState;
using HashTableCtxVariants =
- std::variant<std::monostate,
vectorized::ProcessHashTableProbe<TJoinOp::INNER_JOIN>,
-
vectorized::ProcessHashTableProbe<TJoinOp::LEFT_SEMI_JOIN>,
-
vectorized::ProcessHashTableProbe<TJoinOp::LEFT_ANTI_JOIN>,
-
vectorized::ProcessHashTableProbe<TJoinOp::LEFT_OUTER_JOIN>,
-
vectorized::ProcessHashTableProbe<TJoinOp::FULL_OUTER_JOIN>,
-
vectorized::ProcessHashTableProbe<TJoinOp::RIGHT_OUTER_JOIN>,
- vectorized::ProcessHashTableProbe<TJoinOp::CROSS_JOIN>,
-
vectorized::ProcessHashTableProbe<TJoinOp::RIGHT_SEMI_JOIN>,
-
vectorized::ProcessHashTableProbe<TJoinOp::RIGHT_ANTI_JOIN>,
-
vectorized::ProcessHashTableProbe<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>,
-
vectorized::ProcessHashTableProbe<TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN>>;
+ std::variant<std::monostate,
ProcessHashTableProbe<TJoinOp::INNER_JOIN>,
+ ProcessHashTableProbe<TJoinOp::LEFT_SEMI_JOIN>,
+ ProcessHashTableProbe<TJoinOp::LEFT_ANTI_JOIN>,
+ ProcessHashTableProbe<TJoinOp::LEFT_OUTER_JOIN>,
+ ProcessHashTableProbe<TJoinOp::FULL_OUTER_JOIN>,
+ ProcessHashTableProbe<TJoinOp::RIGHT_OUTER_JOIN>,
+ ProcessHashTableProbe<TJoinOp::CROSS_JOIN>,
+ ProcessHashTableProbe<TJoinOp::RIGHT_SEMI_JOIN>,
+ ProcessHashTableProbe<TJoinOp::RIGHT_ANTI_JOIN>,
+ ProcessHashTableProbe<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>,
+
ProcessHashTableProbe<TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN>>;
class HashJoinProbeOperatorX;
class HashJoinProbeLocalState final
@@ -85,7 +85,7 @@ private:
Status _extract_join_column(vectorized::Block& block, const
std::vector<int>& res_col_ids);
friend class HashJoinProbeOperatorX;
template <int JoinOpType>
- friend struct vectorized::ProcessHashTableProbe;
+ friend struct ProcessHashTableProbe;
int _probe_index = -1;
uint32_t _build_index = 0;
diff --git a/be/src/vec/exec/join/cross_join_impl.cpp
b/be/src/pipeline/exec/join/cross_join_impl.cpp
similarity index 96%
rename from be/src/vec/exec/join/cross_join_impl.cpp
rename to be/src/pipeline/exec/join/cross_join_impl.cpp
index 74fbba20277..16c4bf7f583 100644
--- a/be/src/vec/exec/join/cross_join_impl.cpp
+++ b/be/src/pipeline/exec/join/cross_join_impl.cpp
@@ -19,7 +19,7 @@
#include "process_hash_table_probe_impl.h"
-namespace doris::vectorized {
+namespace doris::pipeline {
INSTANTIATION_FOR(TJoinOp::CROSS_JOIN);
diff --git a/be/src/vec/exec/join/full_outer_join_impl.cpp
b/be/src/pipeline/exec/join/full_outer_join_impl.cpp
similarity index 96%
rename from be/src/vec/exec/join/full_outer_join_impl.cpp
rename to be/src/pipeline/exec/join/full_outer_join_impl.cpp
index 27868390970..0e1ddb58642 100644
--- a/be/src/vec/exec/join/full_outer_join_impl.cpp
+++ b/be/src/pipeline/exec/join/full_outer_join_impl.cpp
@@ -19,7 +19,7 @@
#include "process_hash_table_probe_impl.h"
-namespace doris::vectorized {
+namespace doris::pipeline {
INSTANTIATION_FOR(TJoinOp::FULL_OUTER_JOIN);
diff --git a/be/src/vec/exec/join/inner_join_impl.cpp
b/be/src/pipeline/exec/join/inner_join_impl.cpp
similarity index 96%
rename from be/src/vec/exec/join/inner_join_impl.cpp
rename to be/src/pipeline/exec/join/inner_join_impl.cpp
index 6c1e0d399a5..21767ecafa0 100644
--- a/be/src/vec/exec/join/inner_join_impl.cpp
+++ b/be/src/pipeline/exec/join/inner_join_impl.cpp
@@ -19,7 +19,7 @@
#include "process_hash_table_probe_impl.h"
-namespace doris::vectorized {
+namespace doris::pipeline {
INSTANTIATION_FOR(TJoinOp::INNER_JOIN);
diff --git a/be/src/vec/exec/join/join_op.h
b/be/src/pipeline/exec/join/join_op.h
similarity index 92%
rename from be/src/vec/exec/join/join_op.h
rename to be/src/pipeline/exec/join/join_op.h
index 62569270d9e..616753b72de 100644
--- a/be/src/vec/exec/join/join_op.h
+++ b/be/src/pipeline/exec/join/join_op.h
@@ -20,7 +20,7 @@
#include "vec/common/columns_hashing.h"
#include "vec/core/block.h"
-namespace doris::vectorized {
+namespace doris::pipeline {
/**
* Now we have different kinds of RowRef for join operation. Overall, RowRef
is the base class and
* the class inheritance is below:
@@ -72,7 +72,7 @@ struct Batch {
bool full() const { return size == MAX_SIZE; }
- Batch<RowRefType>* insert(RowRefType&& row_ref, Arena& pool) {
+ Batch<RowRefType>* insert(RowRefType&& row_ref, vectorized::Arena& pool) {
if (full()) {
auto batch = pool.alloc<Batch<RowRefType>>();
*batch = Batch<RowRefType>(this);
@@ -132,7 +132,9 @@ struct RowRefList : RowRef {
ForwardIterator<RowRefList> begin() { return
ForwardIterator<RowRefList>(this); }
/// insert element after current one
- void insert(RowRefType&& row_ref, Arena& pool) {
next.emplace_back(std::move(row_ref)); }
+ void insert(RowRefType&& row_ref, vectorized::Arena& pool) {
+ next.emplace_back(std::move(row_ref));
+ }
void clear() { next.clear(); }
@@ -152,7 +154,7 @@ struct RowRefListWithFlag : RowRef {
}
/// insert element after current one
- void insert(RowRefType&& row_ref, Arena& pool) {
next.emplace_back(row_ref); }
+ void insert(RowRefType&& row_ref, vectorized::Arena& pool) {
next.emplace_back(row_ref); }
void clear() { next.clear(); }
@@ -174,7 +176,7 @@ struct RowRefListWithFlags : RowRefWithFlag {
}
/// insert element after current one
- void insert(RowRefType&& row_ref, Arena& pool) {
next.emplace_back(row_ref); }
+ void insert(RowRefType&& row_ref, vectorized::Arena& pool) {
next.emplace_back(row_ref); }
void clear() { next.clear(); }
@@ -183,4 +185,4 @@ private:
std::vector<RowRefType> next;
};
-} // namespace doris::vectorized
+} // namespace doris::pipeline
diff --git a/be/src/vec/exec/join/left_anti_join_impl.cpp
b/be/src/pipeline/exec/join/left_anti_join_impl.cpp
similarity index 96%
rename from be/src/vec/exec/join/left_anti_join_impl.cpp
rename to be/src/pipeline/exec/join/left_anti_join_impl.cpp
index e1e1037eeab..ab6a45442b2 100644
--- a/be/src/vec/exec/join/left_anti_join_impl.cpp
+++ b/be/src/pipeline/exec/join/left_anti_join_impl.cpp
@@ -19,7 +19,7 @@
#include "process_hash_table_probe_impl.h"
-namespace doris::vectorized {
+namespace doris::pipeline {
INSTANTIATION_FOR(TJoinOp::LEFT_ANTI_JOIN);
diff --git a/be/src/vec/exec/join/left_outer_join_impl.cpp
b/be/src/pipeline/exec/join/left_outer_join_impl.cpp
similarity index 96%
rename from be/src/vec/exec/join/left_outer_join_impl.cpp
rename to be/src/pipeline/exec/join/left_outer_join_impl.cpp
index 681bd2b8164..4c8e2c5ac9f 100644
--- a/be/src/vec/exec/join/left_outer_join_impl.cpp
+++ b/be/src/pipeline/exec/join/left_outer_join_impl.cpp
@@ -19,7 +19,7 @@
#include "process_hash_table_probe_impl.h"
-namespace doris::vectorized {
+namespace doris::pipeline {
INSTANTIATION_FOR(TJoinOp::LEFT_OUTER_JOIN);
diff --git a/be/src/vec/exec/join/left_semi_join_impl.cpp
b/be/src/pipeline/exec/join/left_semi_join_impl.cpp
similarity index 96%
rename from be/src/vec/exec/join/left_semi_join_impl.cpp
rename to be/src/pipeline/exec/join/left_semi_join_impl.cpp
index d06fe1783f6..8365403abfe 100644
--- a/be/src/vec/exec/join/left_semi_join_impl.cpp
+++ b/be/src/pipeline/exec/join/left_semi_join_impl.cpp
@@ -19,7 +19,7 @@
#include "process_hash_table_probe_impl.h"
-namespace doris::vectorized {
+namespace doris::pipeline {
INSTANTIATION_FOR(TJoinOp::LEFT_SEMI_JOIN);
diff --git a/be/src/vec/exec/join/null_aware_left_anti_join_impl.cpp
b/be/src/pipeline/exec/join/null_aware_left_anti_join_impl.cpp
similarity index 96%
rename from be/src/vec/exec/join/null_aware_left_anti_join_impl.cpp
rename to be/src/pipeline/exec/join/null_aware_left_anti_join_impl.cpp
index 8b541d37495..834335f282f 100644
--- a/be/src/vec/exec/join/null_aware_left_anti_join_impl.cpp
+++ b/be/src/pipeline/exec/join/null_aware_left_anti_join_impl.cpp
@@ -19,7 +19,7 @@
#include "process_hash_table_probe_impl.h"
-namespace doris::vectorized {
+namespace doris::pipeline {
INSTANTIATION_FOR(TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN);
diff --git a/be/src/vec/exec/join/null_aware_left_semi_join_impl.cpp
b/be/src/pipeline/exec/join/null_aware_left_semi_join_impl.cpp
similarity index 96%
rename from be/src/vec/exec/join/null_aware_left_semi_join_impl.cpp
rename to be/src/pipeline/exec/join/null_aware_left_semi_join_impl.cpp
index 98d39e61479..b079a0ec95f 100644
--- a/be/src/vec/exec/join/null_aware_left_semi_join_impl.cpp
+++ b/be/src/pipeline/exec/join/null_aware_left_semi_join_impl.cpp
@@ -19,7 +19,7 @@
#include "process_hash_table_probe_impl.h"
-namespace doris::vectorized {
+namespace doris::pipeline {
INSTANTIATION_FOR(TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN);
diff --git a/be/src/vec/exec/join/process_hash_table_probe.h
b/be/src/pipeline/exec/join/process_hash_table_probe.h
similarity index 73%
rename from be/src/vec/exec/join/process_hash_table_probe.h
rename to be/src/pipeline/exec/join/process_hash_table_probe.h
index 11df66d4aaa..965d62192b2 100644
--- a/be/src/vec/exec/join/process_hash_table_probe.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe.h
@@ -25,38 +25,40 @@
#include "vec/common/arena.h"
namespace doris {
-namespace pipeline {
-class HashJoinProbeLocalState;
-}
namespace vectorized {
-
class Block;
class MutableBlock;
struct HashJoinProbeContext;
+} // namespace vectorized
+namespace pipeline {
+
+class HashJoinProbeLocalState;
-using MutableColumnPtr = IColumn::MutablePtr;
-using MutableColumns = std::vector<MutableColumnPtr>;
+using MutableColumnPtr = vectorized::IColumn::MutablePtr;
+using MutableColumns = std::vector<vectorized::MutableColumnPtr>;
-using NullMap = ColumnUInt8::Container;
-using ConstNullMapPtr = const NullMap*;
+using NullMap = vectorized::ColumnUInt8::Container;
+using ConstNullMapPtr = const vectorized::NullMap*;
template <int JoinOpType>
struct ProcessHashTableProbe {
- ProcessHashTableProbe(pipeline::HashJoinProbeLocalState* parent, int
batch_size);
+ ProcessHashTableProbe(HashJoinProbeLocalState* parent, int batch_size);
~ProcessHashTableProbe() = default;
// output build side result column
- void build_side_output_column(MutableColumns& mcol, const
std::vector<bool>& output_slot_flags,
- int size, bool have_other_join_conjunct,
bool is_mark_join);
+ void build_side_output_column(vectorized::MutableColumns& mcol,
+ const std::vector<bool>& output_slot_flags,
int size,
+ bool have_other_join_conjunct, bool
is_mark_join);
- void probe_side_output_column(MutableColumns& mcol, const
std::vector<bool>& output_slot_flags,
- int size, int last_probe_index, bool
all_match_one,
+ void probe_side_output_column(vectorized::MutableColumns& mcol,
+ const std::vector<bool>& output_slot_flags,
int size,
+ int last_probe_index, bool all_match_one,
bool have_other_join_conjunct);
template <bool need_null_map_for_probe, bool ignore_null, typename
HashTableType>
Status process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map,
- MutableBlock& mutable_block, Block* output_block, size_t
probe_rows,
- bool is_mark_join, bool have_other_join_conjunct);
+ vectorized::MutableBlock& mutable_block, vectorized::Block*
output_block,
+ size_t probe_rows, bool is_mark_join, bool
have_other_join_conjunct);
// Only process the join with no other join conjunct, because of no other
join conjunt
// the output block struct is same with mutable block. we can do more opt
on it and simplify
@@ -65,16 +67,17 @@ struct ProcessHashTableProbe {
template <bool need_null_map_for_probe, bool ignore_null, typename
HashTableType,
bool with_other_conjuncts, bool is_mark_join>
Status do_process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map,
- MutableBlock& mutable_block, Block* output_block, size_t
probe_rows);
+ vectorized::MutableBlock& mutable_block,
vectorized::Block* output_block,
+ size_t probe_rows);
// In the presence of other join conjunct, the process of join become more
complicated.
// each matching join column need to be processed by other join conjunct.
so the struct of mutable block
// and output block may be different
// The output result is determined by the other join conjunct result and
same_to_prev struct
- Status do_other_join_conjuncts(Block* output_block, std::vector<uint8_t>&
visited,
+ Status do_other_join_conjuncts(vectorized::Block* output_block,
std::vector<uint8_t>& visited,
bool has_null_in_build_side);
template <bool with_other_conjuncts>
- Status do_mark_join_conjuncts(Block* output_block, size_t
hash_table_bucket_size);
+ Status do_mark_join_conjuncts(vectorized::Block* output_block, size_t
hash_table_bucket_size);
template <typename HashTableType>
typename HashTableType::State _init_probe_side(HashTableType&
hash_table_ctx, size_t probe_rows,
@@ -84,8 +87,9 @@ struct ProcessHashTableProbe {
// Process full outer join/ right join / right semi/anti join to output
the join result
// in hash table
template <typename HashTableType>
- Status process_data_in_hashtable(HashTableType& hash_table_ctx,
MutableBlock& mutable_block,
- Block* output_block, bool* eos, bool
is_mark_join);
+ Status process_data_in_hashtable(HashTableType& hash_table_ctx,
+ vectorized::MutableBlock& mutable_block,
+ vectorized::Block* output_block, bool*
eos, bool is_mark_join);
/// For null aware join with other conjuncts, if the probe key of one row
on left side is null,
/// we should make this row match with all rows in build side.
@@ -93,8 +97,8 @@ struct ProcessHashTableProbe {
pipeline::HashJoinProbeLocalState* _parent = nullptr;
const int _batch_size;
- const std::shared_ptr<Block>& _build_block;
- std::unique_ptr<Arena> _arena;
+ const std::shared_ptr<vectorized::Block>& _build_block;
+ std::unique_ptr<vectorized::Arena> _arena;
std::vector<StringRef> _probe_keys;
std::vector<uint32_t> _probe_indexs;
@@ -110,13 +114,13 @@ struct ProcessHashTableProbe {
std::vector<int> _build_blocks_locs;
// only need set the tuple is null in RIGHT_OUTER_JOIN and FULL_OUTER_JOIN
- ColumnUInt8::Container* _tuple_is_null_left_flags = nullptr;
+ vectorized::ColumnUInt8::Container* _tuple_is_null_left_flags = nullptr;
// only need set the tuple is null in LEFT_OUTER_JOIN and FULL_OUTER_JOIN
- ColumnUInt8::Container* _tuple_is_null_right_flags = nullptr;
+ vectorized::ColumnUInt8::Container* _tuple_is_null_right_flags = nullptr;
size_t _serialized_key_buffer_size {0};
uint8_t* _serialized_key_buffer = nullptr;
- std::unique_ptr<Arena> _serialize_key_arena;
+ std::unique_ptr<vectorized::Arena> _serialize_key_arena;
std::vector<char> _probe_side_find_result;
bool _have_other_join_conjunct;
@@ -139,5 +143,5 @@ struct ProcessHashTableProbe {
int _right_col_len;
};
-} // namespace vectorized
+} // namespace pipeline
} // namespace doris
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h
b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
similarity index 83%
rename from be/src/vec/exec/join/process_hash_table_probe_impl.h
rename to be/src/pipeline/exec/join/process_hash_table_probe_impl.h
index 8654f17591c..5e023f2c861 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
@@ -27,23 +27,22 @@
#include "vec/columns/column_filter_helper.h"
#include "vec/columns/column_nullable.h"
#include "vec/exprs/vexpr_context.h"
-#include "vhash_join_node.h"
-namespace doris::vectorized {
+namespace doris::pipeline {
template <int JoinOpType>
-ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(pipeline::HashJoinProbeLocalState*
parent,
+ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(HashJoinProbeLocalState*
parent,
int batch_size)
: _parent(parent),
_batch_size(batch_size),
_build_block(parent->build_block()),
_tuple_is_null_left_flags(parent->is_outer_join()
- ? &(reinterpret_cast<ColumnUInt8&>(
+ ?
&(reinterpret_cast<vectorized::ColumnUInt8&>(
*parent->_tuple_is_null_left_flag_column)
.get_data())
: nullptr),
_tuple_is_null_right_flags(parent->is_outer_join()
- ?
&(reinterpret_cast<ColumnUInt8&>(
+ ?
&(reinterpret_cast<vectorized::ColumnUInt8&>(
*parent->_tuple_is_null_right_flag_column)
.get_data())
: nullptr),
@@ -65,7 +64,7 @@
ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(pipeline::HashJoinProbe
template <int JoinOpType>
void ProcessHashTableProbe<JoinOpType>::build_side_output_column(
- MutableColumns& mcol, const std::vector<bool>& output_slot_flags, int
size,
+ vectorized::MutableColumns& mcol, const std::vector<bool>&
output_slot_flags, int size,
bool have_other_join_conjunct, bool is_mark_join) {
SCOPED_TIMER(_build_side_output_timer);
constexpr auto is_semi_anti_join = JoinOpType == TJoinOp::RIGHT_ANTI_JOIN
||
@@ -104,7 +103,7 @@ void
ProcessHashTableProbe<JoinOpType>::build_side_output_column(
const auto& column =
*_build_block->safe_get_by_position(i).column;
_build_column_has_null[i] = false;
if (output_slot_flags[i] && column.is_nullable()) {
- const auto& nullable = assert_cast<const
ColumnNullable&>(column);
+ const auto& nullable = assert_cast<const
vectorized::ColumnNullable&>(column);
_build_column_has_null[i] = !simd::contain_byte(
nullable.get_null_map_data().data() + 1,
nullable.size() - 1, 1);
_need_calculate_build_index_has_zero |=
_build_column_has_null[i];
@@ -116,7 +115,7 @@ void
ProcessHashTableProbe<JoinOpType>::build_side_output_column(
const auto& column = *_build_block->safe_get_by_position(i).column;
if (output_slot_flags[i]) {
if (!build_index_has_zero && _build_column_has_null[i]) {
- assert_cast<ColumnNullable*>(mcol[i +
_right_col_idx].get())
+ assert_cast<vectorized::ColumnNullable*>(mcol[i +
_right_col_idx].get())
->insert_indices_from_not_has_null(column,
_build_indexs.data(),
_build_indexs.data() + size);
} else {
@@ -132,7 +131,7 @@ void
ProcessHashTableProbe<JoinOpType>::build_side_output_column(
template <int JoinOpType>
void ProcessHashTableProbe<JoinOpType>::probe_side_output_column(
- MutableColumns& mcol, const std::vector<bool>& output_slot_flags, int
size,
+ vectorized::MutableColumns& mcol, const std::vector<bool>&
output_slot_flags, int size,
int last_probe_index, bool all_match_one, bool
have_other_join_conjunct) {
SCOPED_TIMER(_probe_side_output_timer);
auto& probe_block = _parent->_probe_block;
@@ -189,9 +188,10 @@ template <int JoinOpType>
template <bool need_null_map_for_probe, bool ignore_null, typename
HashTableType,
bool with_other_conjuncts, bool is_mark_join>
Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType&
hash_table_ctx,
- ConstNullMapPtr null_map,
- MutableBlock&
mutable_block,
- Block* output_block,
size_t probe_rows) {
+
vectorized::ConstNullMapPtr null_map,
+ vectorized::MutableBlock&
mutable_block,
+ vectorized::Block*
output_block,
+ size_t probe_rows) {
if (_right_col_len && !_build_block) {
return Status::InternalError("build block is nullptr");
}
@@ -361,7 +361,7 @@ size_t
ProcessHashTableProbe<JoinOpType>::_process_probe_null_key(uint32_t probe
*/
template <int JoinOpType>
template <bool with_other_conjuncts>
-Status ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(Block*
output_block,
+Status
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Block*
output_block,
size_t
hash_table_bucket_size) {
DCHECK(JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
@@ -380,8 +380,9 @@ Status
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(Block* output_b
auto mark_column_mutable =
output_block->get_by_position(_parent->_mark_column_id).column->assume_mutable();
- auto& mark_column = assert_cast<ColumnNullable&>(*mark_column_mutable);
- IColumn::Filter& filter =
assert_cast<ColumnUInt8&>(mark_column.get_nested_column()).get_data();
+ auto& mark_column =
assert_cast<vectorized::ColumnNullable&>(*mark_column_mutable);
+ vectorized::IColumn::Filter& filter =
+
assert_cast<vectorized::ColumnUInt8&>(mark_column.get_nested_column()).get_data();
if (_parent->_mark_join_conjuncts.empty()) {
// For null aware anti/semi join, if the equal conjuncts was not
matched and the build side has null value,
@@ -391,8 +392,9 @@ Status
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(Block* output_b
const bool should_be_null_if_build_side_has_null =
*_has_null_in_build_side;
mark_column.resize(row_count);
- auto* filter_data =
-
assert_cast<ColumnUInt8&>(mark_column.get_nested_column()).get_data().data();
+ auto* filter_data =
assert_cast<vectorized::ColumnUInt8&>(mark_column.get_nested_column())
+ .get_data()
+ .data();
auto* mark_null_map = mark_column.get_null_map_data().data();
int last_probe_matched = -1;
for (size_t i = 0; i != row_count; ++i) {
@@ -417,20 +419,21 @@ Status
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(Block* output_b
memset(mark_null_map, 0, row_count);
}
} else {
-
RETURN_IF_ERROR(VExprContext::execute_conjuncts(_parent->_mark_join_conjuncts,
output_block,
-
mark_column.get_null_map_column(), filter));
+ RETURN_IF_ERROR(vectorized::VExprContext::execute_conjuncts(
+ _parent->_mark_join_conjuncts, output_block,
mark_column.get_null_map_column(),
+ filter));
}
auto* mark_null_map = mark_column.get_null_map_data().data();
auto* mark_filter_data = filter.data();
if constexpr (with_other_conjuncts) {
- IColumn::Filter other_conjunct_filter(row_count, 1);
+ vectorized::IColumn::Filter other_conjunct_filter(row_count, 1);
{
bool can_be_filter_all = false;
-
RETURN_IF_ERROR(VExprContext::execute_conjuncts(_parent->_other_join_conjuncts,
nullptr,
- output_block,
&other_conjunct_filter,
-
&can_be_filter_all));
+ RETURN_IF_ERROR(vectorized::VExprContext::execute_conjuncts(
+ _parent->_other_join_conjuncts, nullptr, output_block,
&other_conjunct_filter,
+ &can_be_filter_all));
}
DCHECK_EQ(filter.size(), other_conjunct_filter.size());
const auto* other_filter_data = other_conjunct_filter.data();
@@ -444,7 +447,7 @@ Status
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(Block* output_b
}
}
- auto filter_column = ColumnUInt8::create(row_count, 0);
+ auto filter_column = vectorized::ColumnUInt8::create(row_count, 0);
auto* __restrict filter_map = filter_column->get_data().data();
/**
@@ -483,12 +486,13 @@ Status
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(Block* output_b
}
auto result_column_id = output_block->columns();
- output_block->insert({std::move(filter_column),
std::make_shared<DataTypeUInt8>(), ""});
- return Block::filter_block(output_block, result_column_id,
result_column_id);
+ output_block->insert(
+ {std::move(filter_column),
std::make_shared<vectorized::DataTypeUInt8>(), ""});
+ return vectorized::Block::filter_block(output_block, result_column_id,
result_column_id);
}
template <int JoinOpType>
-Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(Block*
output_block,
+Status
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(vectorized::Block*
output_block,
std::vector<uint8_t>& visited,
bool
has_null_in_build_side) {
// dispose the other join conjunct exec
@@ -499,27 +503,28 @@ Status
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(Block* output_
SCOPED_TIMER(_parent->_process_other_join_conjunct_timer);
int orig_columns = output_block->columns();
- IColumn::Filter other_conjunct_filter(row_count, 1);
+ vectorized::IColumn::Filter other_conjunct_filter(row_count, 1);
{
bool can_be_filter_all = false;
-
RETURN_IF_ERROR(VExprContext::execute_conjuncts(_parent->_other_join_conjuncts,
nullptr,
- output_block,
&other_conjunct_filter,
- &can_be_filter_all));
+ RETURN_IF_ERROR(vectorized::VExprContext::execute_conjuncts(
+ _parent->_other_join_conjuncts, nullptr, output_block,
&other_conjunct_filter,
+ &can_be_filter_all));
}
- auto filter_column = ColumnUInt8::create();
+ auto filter_column = vectorized::ColumnUInt8::create();
filter_column->get_data() = std::move(other_conjunct_filter);
auto result_column_id = output_block->columns();
- output_block->insert({std::move(filter_column),
std::make_shared<DataTypeUInt8>(), ""});
+ output_block->insert(
+ {std::move(filter_column),
std::make_shared<vectorized::DataTypeUInt8>(), ""});
uint8_t* __restrict filter_column_ptr =
- assert_cast<ColumnUInt8&>(
+ assert_cast<vectorized::ColumnUInt8&>(
output_block->get_by_position(result_column_id).column->assume_mutable_ref())
.get_data()
.data();
if constexpr (JoinOpType == TJoinOp::LEFT_OUTER_JOIN ||
JoinOpType == TJoinOp::FULL_OUTER_JOIN) {
- auto new_filter_column = ColumnUInt8::create(row_count);
+ auto new_filter_column = vectorized::ColumnUInt8::create(row_count);
auto* __restrict filter_map = new_filter_column->get_data().data();
// process equal-conjuncts-matched tuples that are newly generated
@@ -551,7 +556,7 @@ Status
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(Block* output_
} else if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
JoinOpType == TJoinOp::LEFT_SEMI_JOIN) {
- auto new_filter_column = ColumnUInt8::create(row_count);
+ auto new_filter_column = vectorized::ColumnUInt8::create(row_count);
auto* __restrict filter_map = new_filter_column->get_data().data();
for (size_t i = 0; i < row_count; ++i) {
@@ -602,7 +607,8 @@ Status
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(Block* output_
JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
orig_columns = _right_col_idx;
}
- RETURN_IF_ERROR(Block::filter_block(output_block, result_column_id,
orig_columns));
+ RETURN_IF_ERROR(
+ vectorized::Block::filter_block(output_block,
result_column_id, orig_columns));
}
return Status::OK();
@@ -610,15 +616,14 @@ Status
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(Block* output_
template <int JoinOpType>
template <typename HashTableType>
-Status
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableType&
hash_table_ctx,
-
MutableBlock& mutable_block,
- Block*
output_block, bool* eos,
- bool
is_mark_join) {
+Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(
+ HashTableType& hash_table_ctx, vectorized::MutableBlock& mutable_block,
+ vectorized::Block* output_block, bool* eos, bool is_mark_join) {
SCOPED_TIMER(_probe_process_hashtable_timer);
auto& mcol = mutable_block.mutable_columns();
if (is_mark_join) {
- std::unique_ptr<ColumnFilterHelper> mark_column =
- std::make_unique<ColumnFilterHelper>(*mcol[mcol.size() - 1]);
+ std::unique_ptr<vectorized::ColumnFilterHelper> mark_column =
+
std::make_unique<vectorized::ColumnFilterHelper>(*mcol[mcol.size() - 1]);
*eos = hash_table_ctx.hash_table->template iterate_map<JoinOpType,
true>(_build_indexs,
mark_column.get());
} else {
@@ -651,7 +656,8 @@ Status
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
if constexpr (JoinOpType == TJoinOp::RIGHT_OUTER_JOIN ||
JoinOpType == TJoinOp::FULL_OUTER_JOIN) {
for (int i = 0; i < _right_col_idx; ++i) {
-
assert_cast<ColumnNullable*>(mcol[i].get())->insert_many_defaults(block_size);
+ assert_cast<vectorized::ColumnNullable*>(mcol[i].get())
+ ->insert_many_defaults(block_size);
}
_tuple_is_null_left_flags->resize_fill(block_size, 1);
}
@@ -664,8 +670,9 @@ Status
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
template <int JoinOpType>
template <bool need_null_map_for_probe, bool ignore_null, typename
HashTableType>
Status ProcessHashTableProbe<JoinOpType>::process(HashTableType&
hash_table_ctx,
- ConstNullMapPtr null_map,
- MutableBlock& mutable_block,
Block* output_block,
+ vectorized::ConstNullMapPtr
null_map,
+ vectorized::MutableBlock&
mutable_block,
+ vectorized::Block*
output_block,
size_t probe_rows, bool
is_mark_join,
bool
have_other_join_conjunct) {
Status res;
@@ -675,7 +682,8 @@ Status
ProcessHashTableProbe<JoinOpType>::process(HashTableType& hash_table_ctx,
have_other_join_conjunct, is_mark_join>(
hash_table_ctx, null_map, mutable_block, output_block,
probe_rows);
},
- make_bool_variant(is_mark_join),
make_bool_variant(have_other_join_conjunct));
+ vectorized::make_bool_variant(is_mark_join),
+ vectorized::make_bool_variant(have_other_join_conjunct));
return res;
}
@@ -687,50 +695,50 @@ struct ExtractType<T(U)> {
using Type = U;
};
-#define INSTANTIATION(JoinOpType, T)
\
- template Status
\
- ProcessHashTableProbe<JoinOpType>::process<false, false,
ExtractType<void(T)>::Type>( \
- ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr
null_map, \
- MutableBlock & mutable_block, Block * output_block, size_t
probe_rows, \
- bool is_mark_join, bool have_other_join_conjunct);
\
- template Status
\
- ProcessHashTableProbe<JoinOpType>::process<false, true,
ExtractType<void(T)>::Type>( \
- ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr
null_map, \
- MutableBlock & mutable_block, Block * output_block, size_t
probe_rows, \
- bool is_mark_join, bool have_other_join_conjunct);
\
- template Status
\
- ProcessHashTableProbe<JoinOpType>::process<true, false,
ExtractType<void(T)>::Type>( \
- ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr
null_map, \
- MutableBlock & mutable_block, Block * output_block, size_t
probe_rows, \
- bool is_mark_join, bool have_other_join_conjunct);
\
- template Status
\
- ProcessHashTableProbe<JoinOpType>::process<true, true,
ExtractType<void(T)>::Type>( \
- ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr
null_map, \
- MutableBlock & mutable_block, Block * output_block, size_t
probe_rows, \
- bool is_mark_join, bool have_other_join_conjunct);
\
-
\
- template Status
\
-
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable<ExtractType<void(T)>::Type>(
\
- ExtractType<void(T)>::Type & hash_table_ctx, MutableBlock &
mutable_block, \
- Block * output_block, bool* eos, bool is_mark_join);
-
-#define INSTANTIATION_FOR(JoinOpType) \
- template struct ProcessHashTableProbe<JoinOpType>; \
- \
- INSTANTIATION(JoinOpType, (SerializedHashTableContext)); \
- INSTANTIATION(JoinOpType, (I8HashTableContext)); \
- INSTANTIATION(JoinOpType, (I16HashTableContext)); \
- INSTANTIATION(JoinOpType, (I32HashTableContext)); \
- INSTANTIATION(JoinOpType, (I64HashTableContext)); \
- INSTANTIATION(JoinOpType, (I128HashTableContext)); \
- INSTANTIATION(JoinOpType, (I256HashTableContext)); \
- INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<true>)); \
- INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<false>)); \
- INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<true>)); \
- INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<false>)); \
- INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<true>)); \
- INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<false>)); \
- INSTANTIATION(JoinOpType, (I136FixedKeyHashTableContext<true>)); \
+#define INSTANTIATION(JoinOpType, T)
\
+ template Status
\
+ ProcessHashTableProbe<JoinOpType>::process<false, false,
ExtractType<void(T)>::Type>( \
+ ExtractType<void(T)>::Type & hash_table_ctx,
vectorized::ConstNullMapPtr null_map, \
+ vectorized::MutableBlock & mutable_block, vectorized::Block *
output_block, \
+ size_t probe_rows, bool is_mark_join, bool
have_other_join_conjunct); \
+ template Status
\
+ ProcessHashTableProbe<JoinOpType>::process<false, true,
ExtractType<void(T)>::Type>( \
+ ExtractType<void(T)>::Type & hash_table_ctx,
vectorized::ConstNullMapPtr null_map, \
+ vectorized::MutableBlock & mutable_block, vectorized::Block *
output_block, \
+ size_t probe_rows, bool is_mark_join, bool
have_other_join_conjunct); \
+ template Status
\
+ ProcessHashTableProbe<JoinOpType>::process<true, false,
ExtractType<void(T)>::Type>( \
+ ExtractType<void(T)>::Type & hash_table_ctx,
vectorized::ConstNullMapPtr null_map, \
+ vectorized::MutableBlock & mutable_block, vectorized::Block *
output_block, \
+ size_t probe_rows, bool is_mark_join, bool
have_other_join_conjunct); \
+ template Status
\
+ ProcessHashTableProbe<JoinOpType>::process<true, true,
ExtractType<void(T)>::Type>( \
+ ExtractType<void(T)>::Type & hash_table_ctx,
vectorized::ConstNullMapPtr null_map, \
+ vectorized::MutableBlock & mutable_block, vectorized::Block *
output_block, \
+ size_t probe_rows, bool is_mark_join, bool
have_other_join_conjunct); \
+
\
+ template Status
\
+
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable<ExtractType<void(T)>::Type>(
\
+ ExtractType<void(T)>::Type & hash_table_ctx,
vectorized::MutableBlock & mutable_block, \
+ vectorized::Block * output_block, bool* eos, bool is_mark_join);
+
+#define INSTANTIATION_FOR(JoinOpType) \
+ template struct ProcessHashTableProbe<JoinOpType>; \
+ \
+ INSTANTIATION(JoinOpType, (vectorized::SerializedHashTableContext)); \
+ INSTANTIATION(JoinOpType, (I8HashTableContext)); \
+ INSTANTIATION(JoinOpType, (I16HashTableContext)); \
+ INSTANTIATION(JoinOpType, (I32HashTableContext)); \
+ INSTANTIATION(JoinOpType, (I64HashTableContext)); \
+ INSTANTIATION(JoinOpType, (I128HashTableContext)); \
+ INSTANTIATION(JoinOpType, (I256HashTableContext)); \
+ INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<true>)); \
+ INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<false>)); \
+ INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<true>)); \
+ INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<false>)); \
+ INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<true>)); \
+ INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<false>)); \
+ INSTANTIATION(JoinOpType, (I136FixedKeyHashTableContext<true>)); \
INSTANTIATION(JoinOpType, (I136FixedKeyHashTableContext<false>));
-} // namespace doris::vectorized
+} // namespace doris::pipeline
diff --git a/be/src/vec/exec/join/right_anti_join_impl.cpp
b/be/src/pipeline/exec/join/right_anti_join_impl.cpp
similarity index 96%
rename from be/src/vec/exec/join/right_anti_join_impl.cpp
rename to be/src/pipeline/exec/join/right_anti_join_impl.cpp
index 1c4eec5cf76..e3cc943bf3b 100644
--- a/be/src/vec/exec/join/right_anti_join_impl.cpp
+++ b/be/src/pipeline/exec/join/right_anti_join_impl.cpp
@@ -19,7 +19,7 @@
#include "process_hash_table_probe_impl.h"
-namespace doris::vectorized {
+namespace doris::pipeline {
INSTANTIATION_FOR(TJoinOp::RIGHT_ANTI_JOIN);
diff --git a/be/src/vec/exec/join/right_outer_join_impl.cpp
b/be/src/pipeline/exec/join/right_outer_join_impl.cpp
similarity index 96%
rename from be/src/vec/exec/join/right_outer_join_impl.cpp
rename to be/src/pipeline/exec/join/right_outer_join_impl.cpp
index e732639610e..65c6eaaab18 100644
--- a/be/src/vec/exec/join/right_outer_join_impl.cpp
+++ b/be/src/pipeline/exec/join/right_outer_join_impl.cpp
@@ -19,7 +19,7 @@
#include "process_hash_table_probe_impl.h"
-namespace doris::vectorized {
+namespace doris::pipeline {
INSTANTIATION_FOR(TJoinOp::RIGHT_OUTER_JOIN);
diff --git a/be/src/vec/exec/join/right_semi_join_impl.cpp
b/be/src/pipeline/exec/join/right_semi_join_impl.cpp
similarity index 96%
rename from be/src/vec/exec/join/right_semi_join_impl.cpp
rename to be/src/pipeline/exec/join/right_semi_join_impl.cpp
index 0ee93734439..f89826d8845 100644
--- a/be/src/vec/exec/join/right_semi_join_impl.cpp
+++ b/be/src/pipeline/exec/join/right_semi_join_impl.cpp
@@ -19,7 +19,7 @@
#include "process_hash_table_probe_impl.h"
-namespace doris::vectorized {
+namespace doris::pipeline {
INSTANTIATION_FOR(TJoinOp::RIGHT_SEMI_JOIN);
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index 03d0d380fcb..25bc28b5d43 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -28,10 +28,9 @@ namespace doris::pipeline {
MultiCastDataStreamSourceLocalState::MultiCastDataStreamSourceLocalState(RuntimeState*
state,
OperatorXBase* parent)
: Base(state, parent),
-
vectorized::RuntimeFilterConsumer(static_cast<Parent*>(parent)->dest_id_from_sink(),
- parent->runtime_filter_descs(),
-
static_cast<Parent*>(parent)->_row_desc(), _conjuncts) {
-}
+
RuntimeFilterConsumer(static_cast<Parent*>(parent)->dest_id_from_sink(),
+ parent->runtime_filter_descs(),
+ static_cast<Parent*>(parent)->_row_desc(),
_conjuncts) {}
Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index a772f1ca70f..8ecbd23764d 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -23,7 +23,7 @@
#include "common/status.h"
#include "operator.h"
-#include "vec/exec/runtime_filter_consumer.h"
+#include "pipeline/common/runtime_filter_consumer.h"
namespace doris {
class RuntimeState;
@@ -37,7 +37,7 @@ class MultiCastDataStreamer;
class MultiCastDataStreamerSourceOperatorX;
class MultiCastDataStreamSourceLocalState final : public
PipelineXLocalState<MultiCastSharedState>,
- public
vectorized::RuntimeFilterConsumer {
+ public RuntimeFilterConsumer
{
public:
ENABLE_FACTORY_CREATOR(MultiCastDataStreamSourceLocalState);
using Base = PipelineXLocalState<MultiCastSharedState>;
@@ -85,7 +85,6 @@ public:
Status prepare(RuntimeState* state) override {
RETURN_IF_ERROR(Base::prepare(state));
- // RETURN_IF_ERROR(vectorized::RuntimeFilterConsumer::init(state));
// init profile for runtime filter
//
RuntimeFilterConsumer::_init_profile(local_state._shared_state->_multi_cast_data_streamer->profile());
if (_t_data_stream_sink.__isset.output_exprs) {
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp
b/be/src/pipeline/exec/olap_scan_operator.cpp
index 5b458314d25..346540e03ec 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -27,10 +27,10 @@
#include "olap/parallel_scanner_builder.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
+#include "pipeline/common/runtime_filter_consumer.h"
#include "pipeline/exec/scan_operator.h"
#include "service/backend_options.h"
#include "util/to_string.h"
-#include "vec/exec/runtime_filter_consumer.h"
#include "vec/exec/scan/new_olap_scanner.h"
#include "vec/exec/scan/vscan_node.h"
#include "vec/exprs/vcompound_pred.h"
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index af3b0fa4077..e5e44498ec0 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -18,6 +18,7 @@
#pragma once
#include "aggregation_sink_operator.h"
#include "pipeline/exec/operator.h"
+#include "vec/exprs/vectorized_agg_fn.h"
#include "vec/exprs/vexpr.h"
#include "vec/spill/spill_stream_manager.h"
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 39e4b8dac26..3f1ce8e7bbb 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -22,6 +22,7 @@
#include <cstdint>
#include <memory>
+#include "pipeline/common/runtime_filter_consumer.h"
#include "pipeline/exec/es_scan_operator.h"
#include "pipeline/exec/file_scan_operator.h"
#include "pipeline/exec/group_commit_scan_operator.h"
@@ -30,7 +31,6 @@
#include "pipeline/exec/olap_scan_operator.h"
#include "pipeline/exec/operator.h"
#include "util/runtime_profile.h"
-#include "vec/exec/runtime_filter_consumer.h"
#include "vec/exprs/vcast_expr.h"
#include "vec/exprs/vcompound_pred.h"
#include "vec/exprs/vectorized_fn_call.h"
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index 120c39bfa43..84db26da051 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -23,10 +23,15 @@
#include <string>
#include "common/status.h"
+#include "exprs/function_filter.h"
#include "operator.h"
+#include "pipeline/common/runtime_filter_consumer.h"
#include "pipeline/dependency.h"
#include "runtime/descriptors.h"
#include "vec/exec/scan/vscan_node.h"
+#include "vec/exprs/vectorized_fn_call.h"
+#include "vec/exprs/vin_predicate.h"
+#include "vec/utils/util.hpp"
namespace doris::vectorized {
class ScannerDelegate;
@@ -55,12 +60,12 @@ struct FilterPredicates {
std::vector<std::pair<std::string, std::shared_ptr<HybridSetBase>>>
in_filters;
};
-class ScanLocalStateBase : public PipelineXLocalState<>, public
vectorized::RuntimeFilterConsumer {
+class ScanLocalStateBase : public PipelineXLocalState<>, public
RuntimeFilterConsumer {
public:
ScanLocalStateBase(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState<>(state, parent),
- vectorized::RuntimeFilterConsumer(parent->node_id(),
parent->runtime_filter_descs(),
- parent->row_descriptor(),
_conjuncts) {}
+ RuntimeFilterConsumer(parent->node_id(),
parent->runtime_filter_descs(),
+ parent->row_descriptor(), _conjuncts) {}
~ScanLocalStateBase() override = default;
virtual bool ready_to_read() = 0;
diff --git a/be/src/pipeline/exec/set_source_operator.cpp
b/be/src/pipeline/exec/set_source_operator.cpp
index 0994350430b..c6a80f8d06c 100644
--- a/be/src/pipeline/exec/set_source_operator.cpp
+++ b/be/src/pipeline/exec/set_source_operator.cpp
@@ -144,7 +144,7 @@ Status
SetSourceOperatorX<is_intersect>::_get_data_in_hashtable(
template <bool is_intersect>
void SetSourceOperatorX<is_intersect>::_add_result_columns(
- SetSourceLocalState<is_intersect>& local_state,
vectorized::RowRefListWithFlags& value,
+ SetSourceLocalState<is_intersect>& local_state, RowRefListWithFlags&
value,
int& block_size) {
auto& build_col_idx = local_state._shared_state->build_col_idx;
auto& build_block = local_state._shared_state->build_block;
diff --git a/be/src/pipeline/exec/set_source_operator.h
b/be/src/pipeline/exec/set_source_operator.h
index 2bbc4257193..5157a2f9c97 100644
--- a/be/src/pipeline/exec/set_source_operator.h
+++ b/be/src/pipeline/exec/set_source_operator.h
@@ -81,7 +81,7 @@ private:
const int batch_size, bool* eos);
void _add_result_columns(SetSourceLocalState<is_intersect>& local_state,
- vectorized::RowRefListWithFlags& value, int&
block_size);
+ RowRefListWithFlags& value, int& block_size);
const int _child_quantity;
};
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index bea67b3050a..837a33dc437 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -24,6 +24,8 @@
#include "common/compiler_util.h" // IWYU pragma: keep
#include "pipeline/exec/operator.h"
+#include "vec/exprs/vectorized_agg_fn.h"
+#include "vec/exprs/vslot_ref.h"
namespace doris {
class RuntimeState;
@@ -76,7 +78,7 @@ static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE =
StreamingAggLocalState::StreamingAggLocalState(RuntimeState* state,
OperatorXBase* parent)
: Base(state, parent),
_agg_arena_pool(std::make_unique<vectorized::Arena>()),
- _agg_data(std::make_unique<vectorized::AggregatedDataVariants>()),
+ _agg_data(std::make_unique<AggregatedDataVariants>()),
_agg_profile_arena(std::make_unique<vectorized::Arena>()),
_child_block(vectorized::Block::create_unique()),
_pre_aggregated_block(vectorized::Block::create_unique()) {}
@@ -164,13 +166,11 @@ Status StreamingAggLocalState::open(RuntimeState* state) {
using KeyType = typename HashTableType::Key;
/// some aggregate functions (like AVG for
decimal) have align issues.
- _aggregate_data_container =
-
std::make_unique<vectorized::AggregateDataContainer>(
- sizeof(KeyType),
-
((p._total_size_of_aggregate_states +
- p._align_aggregate_states -
1) /
- p._align_aggregate_states) *
-
p._align_aggregate_states);
+ _aggregate_data_container =
std::make_unique<AggregateDataContainer>(
+ sizeof(KeyType),
((p._total_size_of_aggregate_states +
+
p._align_aggregate_states - 1) /
+
p._align_aggregate_states) *
+
p._align_aggregate_states);
}},
_agg_data->method_variant);
if (p._is_merge) {
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index 227536170ea..bf856435982 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -108,12 +108,12 @@ private:
bool _should_expand_hash_table = true;
int64_t _cur_num_rows_returned = 0;
std::unique_ptr<vectorized::Arena> _agg_arena_pool = nullptr;
- vectorized::AggregatedDataVariantsUPtr _agg_data = nullptr;
+ AggregatedDataVariantsUPtr _agg_data = nullptr;
std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators;
// group by k1,k2
vectorized::VExprContextSPtrs _probe_expr_ctxs;
std::unique_ptr<vectorized::Arena> _agg_profile_arena = nullptr;
- std::unique_ptr<vectorized::AggregateDataContainer>
_aggregate_data_container = nullptr;
+ std::unique_ptr<AggregateDataContainer> _aggregate_data_container =
nullptr;
bool _should_limit_output = false;
bool _reach_limit = false;
size_t _input_num_rows = 0;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index bb3ef4ff0f8..41ba68e8cbb 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -817,14 +817,6 @@ void FragmentMgr::cancel_query(const TUniqueId query_id,
const Status reason) {
return;
}
}
- if (query_ctx->enable_pipeline_x_exec()) {
- query_ctx->cancel_all_pipeline_context(reason);
- } else {
- for (auto it : all_instance_ids) {
- cancel_instance(it, reason);
- }
- }
-
query_ctx->cancel(reason);
{
std::lock_guard<std::mutex> state_lock(_lock);
@@ -862,7 +854,6 @@ void FragmentMgr::cancel_instance(const TUniqueId
instance_id, const Status reas
void FragmentMgr::cancel_worker() {
LOG(INFO) << "FragmentMgr cancel worker start working.";
do {
- std::vector<TUniqueId> to_cancel;
std::vector<TUniqueId> queries_lost_coordinator;
std::vector<TUniqueId> queries_timeout;
@@ -937,17 +928,6 @@ void FragmentMgr::cancel_worker() {
}
}
- // TODO(zhiqiang): It seems that timeout_canceled_fragment_count is
- // designed to count canceled fragment of non-pipeline query.
- timeout_canceled_fragment_count->increment(to_cancel.size());
- for (auto& id : to_cancel) {
- cancel_instance(id,
- Status::Error<ErrorCode::TIMEOUT>(
- "FragmentMgr cancel worker going to cancel
timeout instance "));
- LOG(INFO) << "FragmentMgr cancel worker going to cancel timeout
instance "
- << print_id(id);
- }
-
if (!queries_lost_coordinator.empty()) {
LOG(INFO) << "There are " << queries_lost_coordinator.size()
<< " queries need to be cancelled, coordinator dead or
restarted.";
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index a8efd4d9392..44bdaa5971a 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -213,6 +213,10 @@ void QueryContext::cancel(Status new_status, int
fragment_id) {
}
set_ready_to_execute(new_status);
+ cancel_all_pipeline_context(new_status, fragment_id);
+}
+
+void QueryContext::cancel_all_pipeline_context(const Status& reason, int
fragment_id) {
std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>>
ctx_to_cancel;
{
std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
@@ -223,23 +227,6 @@ void QueryContext::cancel(Status new_status, int
fragment_id) {
ctx_to_cancel.push_back(f_context);
}
}
- // Must not add lock here. There maybe dead lock because it will call
fragment
- // ctx cancel and fragment ctx will call query ctx cancel.
- for (auto& f_context : ctx_to_cancel) {
- if (auto pipeline_ctx = f_context.lock()) {
- pipeline_ctx->cancel(new_status);
- }
- }
-}
-
-void QueryContext::cancel_all_pipeline_context(const Status& reason) {
- std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>>
ctx_to_cancel;
- {
- std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
- for (auto& [f_id, f_context] : _fragment_id_to_pipeline_ctx) {
- ctx_to_cancel.push_back(f_context);
- }
- }
for (auto& f_context : ctx_to_cancel) {
if (auto pipeline_ctx = f_context.lock()) {
pipeline_ctx->cancel(reason);
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index dc7ea7e29bf..ee744a89466 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -99,7 +99,7 @@ public:
[[nodiscard]] bool is_cancelled() const { return !_exec_status.ok(); }
- void cancel_all_pipeline_context(const Status& reason);
+ void cancel_all_pipeline_context(const Status& reason, int fragment_id =
-1);
std::string print_all_pipeline_context();
Status cancel_pipeline_context(const int fragment_id, const Status&
reason);
void set_pipeline_context(const int fragment_id,
diff --git a/be/src/vec/common/hash_table/hash_map_context.h
b/be/src/vec/common/hash_table/hash_map_context.h
index 8795d90553a..6ca0653f7a9 100644
--- a/be/src/vec/common/hash_table/hash_map_context.h
+++ b/be/src/vec/common/hash_table/hash_map_context.h
@@ -31,9 +31,12 @@
#include "vec/common/hash_table/string_hash_map.h"
#include "vec/common/string_ref.h"
#include "vec/core/types.h"
-#include "vec/exec/join/join_op.h"
#include "vec/utils/util.hpp"
+namespace doris::pipeline {
+struct RowRefListWithFlags;
+}
+
namespace doris::vectorized {
constexpr auto BITSIZE = 8;
@@ -595,12 +598,13 @@ using FixedKeyHashTableContext =
MethodKeysFixed<JoinHashMap<Key, HashCRC32<Key>
template <class Key, bool has_null>
using SetFixedKeyHashTableContext =
- MethodKeysFixed<HashMap<Key, RowRefListWithFlags, HashCRC32<Key>>,
has_null>;
+ MethodKeysFixed<HashMap<Key, pipeline::RowRefListWithFlags,
HashCRC32<Key>>, has_null>;
template <class T>
using SetPrimaryTypeHashTableContext =
- MethodOneNumber<T, HashMap<T, RowRefListWithFlags, HashCRC32<T>>>;
+ MethodOneNumber<T, HashMap<T, pipeline::RowRefListWithFlags,
HashCRC32<T>>>;
-using SetSerializedHashTableContext = MethodSerialized<HashMap<StringRef,
RowRefListWithFlags>>;
+using SetSerializedHashTableContext =
+ MethodSerialized<HashMap<StringRef, pipeline::RowRefListWithFlags>>;
} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/common/hash_table/hash_table_set_build.h
b/be/src/vec/common/hash_table/hash_table_set_build.h
index 8101360a9f9..f9aeeeef14c 100644
--- a/be/src/vec/common/hash_table/hash_table_set_build.h
+++ b/be/src/vec/common/hash_table/hash_table_set_build.h
@@ -20,6 +20,7 @@
#include "vec/columns/column.h"
namespace doris::vectorized {
+constexpr size_t CHECK_FRECUENCY = 65536;
template <class HashTableContext, bool is_intersect>
struct HashTableBuild {
template <typename Parent>
diff --git a/be/src/vec/exec/join/vacquire_list.hpp
b/be/src/vec/exec/join/vacquire_list.hpp
deleted file mode 100644
index 6a3157cc384..00000000000
--- a/be/src/vec/exec/join/vacquire_list.hpp
+++ /dev/null
@@ -1,54 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <memory>
-#include <vector>
-
-namespace doris::vectorized {
-
-template <typename Element, int batch_size = 8>
-struct AcquireList {
- using Batch = Element[batch_size];
-
- Element& acquire(Element&& element) {
- if (_current_batch == nullptr) {
- _current_batch.reset(new Element[batch_size]);
- }
- if (current_full()) {
- _lst.emplace_back(std::move(_current_batch));
- _current_batch.reset(new Element[batch_size]);
- _current_offset = 0;
- }
-
- auto base_addr = _current_batch.get();
- base_addr[_current_offset] = std::move(element);
- auto& ref = base_addr[_current_offset];
- _current_offset++;
- return ref;
- }
-
- void remove_last_element() { _current_offset--; }
-
-private:
- bool current_full() { return _current_offset == batch_size; }
- std::vector<std::unique_ptr<Element[]>> _lst;
- std::unique_ptr<Element[]> _current_batch;
- int _current_offset = 0;
-};
-} // namespace doris::vectorized
diff --git a/be/src/vec/exec/join/vhash_join_node.h
b/be/src/vec/exec/join/vhash_join_node.h
deleted file mode 100644
index 80d278a220f..00000000000
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ /dev/null
@@ -1,180 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <gen_cpp/PlanNodes_types.h>
-#include <stddef.h>
-#include <stdint.h>
-
-#include <atomic>
-#include <iosfwd>
-#include <memory>
-#include <string>
-#include <unordered_map>
-#include <variant>
-#include <vector>
-
-#include "common/global_types.h"
-#include "common/status.h"
-#include "exprs/runtime_filter_slots.h"
-#include "util/runtime_profile.h"
-#include "vec/aggregate_functions/aggregate_function.h"
-#include "vec/columns/column.h"
-#include "vec/columns/columns_number.h"
-#include "vec/common/arena.h"
-#include "vec/common/columns_hashing.h"
-#include "vec/common/hash_table/hash_map_context.h"
-#include "vec/common/hash_table/hash_map_context_creator.h"
-#include "vec/common/hash_table/partitioned_hash_map.h"
-#include "vec/common/string_ref.h"
-#include "vec/core/block.h"
-#include "vec/core/types.h"
-#include "vec/exec/join/join_op.h" // IWYU pragma: keep
-#include "vec/exprs/vexpr_fwd.h"
-
-template <typename T>
-struct HashCRC32;
-
-namespace doris {
-class ObjectPool;
-class DescriptorTbl;
-class RuntimeState;
-
-namespace pipeline {
-class HashJoinProbeLocalState;
-class HashJoinBuildSinkLocalState;
-} // namespace pipeline
-
-namespace vectorized {
-
-constexpr size_t CHECK_FRECUENCY = 65536;
-
-struct UInt128;
-struct UInt256;
-template <int JoinOpType>
-struct ProcessHashTableProbe;
-
-template <typename Parent>
-Status process_runtime_filter_build(RuntimeState* state, Block* block, Parent*
parent,
- bool is_global = false) {
- if (parent->runtime_filters().empty()) {
- return Status::OK();
- }
- uint64_t rows = block->rows();
- {
- SCOPED_TIMER(parent->_runtime_filter_init_timer);
- RETURN_IF_ERROR(parent->_runtime_filter_slots->init_filters(state,
rows));
- RETURN_IF_ERROR(parent->_runtime_filter_slots->ignore_filters(state));
- }
-
- if (!parent->_runtime_filter_slots->empty() && rows > 1) {
- SCOPED_TIMER(parent->_runtime_filter_compute_timer);
- parent->_runtime_filter_slots->insert(block);
- }
- {
- SCOPED_TIMER(parent->_publish_runtime_filter_timer);
- RETURN_IF_ERROR(parent->_runtime_filter_slots->publish());
- }
-
- return Status::OK();
-}
-
-using ProfileCounter = RuntimeProfile::Counter;
-
-template <class HashTableContext, typename Parent>
-struct ProcessHashTableBuild {
- ProcessHashTableBuild(int rows, ColumnRawPtrs& build_raw_ptrs, Parent*
parent, int batch_size,
- RuntimeState* state)
- : _rows(rows),
- _build_raw_ptrs(build_raw_ptrs),
- _parent(parent),
- _batch_size(batch_size),
- _state(state) {}
-
- template <int JoinOpType, bool ignore_null, bool short_circuit_for_null,
- bool with_other_conjuncts>
- Status run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map,
bool* has_null_key) {
- if (short_circuit_for_null || ignore_null) {
- // first row is mocked and is null
- for (uint32_t i = 1; i < _rows; i++) {
- if ((*null_map)[i]) {
- *has_null_key = true;
- }
- }
- if (short_circuit_for_null && *has_null_key) {
- return Status::OK();
- }
- }
-
- SCOPED_TIMER(_parent->_build_table_insert_timer);
- hash_table_ctx.hash_table->template prepare_build<JoinOpType>(_rows,
_batch_size,
-
*has_null_key);
-
- hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows,
- null_map ? null_map->data() :
nullptr, true, true,
-
hash_table_ctx.hash_table->get_bucket_size());
- hash_table_ctx.hash_table->template build<JoinOpType,
with_other_conjuncts>(
- hash_table_ctx.keys, hash_table_ctx.bucket_nums.data(), _rows);
- hash_table_ctx.bucket_nums.resize(_batch_size);
- hash_table_ctx.bucket_nums.shrink_to_fit();
-
- COUNTER_SET(_parent->_hash_table_memory_usage,
- (int64_t)hash_table_ctx.hash_table->get_byte_size());
- COUNTER_SET(_parent->_build_arena_memory_usage,
- (int64_t)hash_table_ctx.serialized_keys_size(true));
- return Status::OK();
- }
-
-private:
- const uint32_t _rows;
- ColumnRawPtrs& _build_raw_ptrs;
- Parent* _parent = nullptr;
- int _batch_size;
- RuntimeState* _state = nullptr;
-};
-
-using I8HashTableContext = PrimaryTypeHashTableContext<UInt8>;
-using I16HashTableContext = PrimaryTypeHashTableContext<UInt16>;
-using I32HashTableContext = PrimaryTypeHashTableContext<UInt32>;
-using I64HashTableContext = PrimaryTypeHashTableContext<UInt64>;
-using I128HashTableContext = PrimaryTypeHashTableContext<UInt128>;
-using I256HashTableContext = PrimaryTypeHashTableContext<UInt256>;
-
-template <bool has_null>
-using I64FixedKeyHashTableContext = FixedKeyHashTableContext<UInt64, has_null>;
-
-template <bool has_null>
-using I128FixedKeyHashTableContext = FixedKeyHashTableContext<UInt128,
has_null>;
-
-template <bool has_null>
-using I256FixedKeyHashTableContext = FixedKeyHashTableContext<UInt256,
has_null>;
-
-template <bool has_null>
-using I136FixedKeyHashTableContext = FixedKeyHashTableContext<UInt136,
has_null>;
-
-using HashTableVariants =
- std::variant<std::monostate, SerializedHashTableContext,
I8HashTableContext,
- I16HashTableContext, I32HashTableContext,
I64HashTableContext,
- I128HashTableContext, I256HashTableContext,
I64FixedKeyHashTableContext<true>,
- I64FixedKeyHashTableContext<false>,
I128FixedKeyHashTableContext<true>,
- I128FixedKeyHashTableContext<false>,
I256FixedKeyHashTableContext<true>,
- I256FixedKeyHashTableContext<false>,
I136FixedKeyHashTableContext<true>,
- I136FixedKeyHashTableContext<false>>;
-
-} // namespace vectorized
-} // namespace doris
diff --git a/be/src/vec/exec/scan/vscan_node.h
b/be/src/vec/exec/scan/vscan_node.h
index cd869cae5d2..70ac5970274 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -17,59 +17,7 @@
#pragma once
-#include <fmt/format.h>
-#include <gen_cpp/Exprs_types.h>
-#include <gen_cpp/PlanNodes_types.h>
-#include <parallel_hashmap/phmap.h>
-#include <stdint.h>
-
-#include <functional>
-#include <list>
-#include <map>
-#include <memory>
-#include <string>
-#include <unordered_map>
-#include <utility>
-#include <vector>
-
-#include "common/global_types.h"
-#include "common/object_pool.h"
-#include "common/status.h"
-#include "exec/olap_common.h"
-#include "exprs/function_filter.h"
-#include "runtime/define_primitive_type.h"
-#include "runtime/query_context.h"
-#include "runtime/runtime_state.h"
-#include "util/runtime_profile.h"
-#include "vec/exec/runtime_filter_consumer.h"
-#include "vec/exec/scan/scanner_context.h"
#include "vec/exec/scan/vscanner.h"
-#include "vec/runtime/shared_scanner_controller.h"
-
-namespace doris {
-class BitmapFilterFuncBase;
-class BloomFilterFuncBase;
-class DescriptorTbl;
-class FunctionContext;
-class HybridSetBase;
-class IRuntimeFilter;
-class SlotDescriptor;
-class TScanRangeParams;
-class TupleDescriptor;
-
-namespace vectorized {
-class Block;
-class VExpr;
-class VExprContext;
-class VInPredicate;
-class VectorizedFnCall;
-} // namespace vectorized
-struct StringRef;
-} // namespace doris
-
-namespace doris::pipeline {
-class ScanOperator;
-}
namespace doris::vectorized {
diff --git a/be/src/vec/exec/vaggregation_node.h
b/be/src/vec/exec/vaggregation_node.h
deleted file mode 100644
index fd369d80278..00000000000
--- a/be/src/vec/exec/vaggregation_node.h
+++ /dev/null
@@ -1,366 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <assert.h>
-#include <glog/logging.h>
-#include <stddef.h>
-#include <stdint.h>
-
-#include <algorithm>
-#include <functional>
-#include <memory>
-#include <ostream>
-#include <string>
-#include <type_traits>
-#include <utility>
-#include <variant>
-#include <vector>
-
-#include "common/compiler_util.h" // IWYU pragma: keep
-#include "common/global_types.h"
-#include "common/status.h"
-#include "util/runtime_profile.h"
-#include "vec/aggregate_functions/aggregate_function.h"
-#include "vec/columns/column.h"
-#include "vec/columns/column_nullable.h"
-#include "vec/columns/column_string.h"
-#include "vec/columns/columns_number.h"
-#include "vec/common/allocator.h"
-#include "vec/common/arena.h"
-#include "vec/common/assert_cast.h"
-#include "vec/common/columns_hashing.h"
-#include "vec/common/hash_table/hash.h"
-#include "vec/common/hash_table/hash_map_context.h"
-#include "vec/common/hash_table/hash_map_context_creator.h"
-#include "vec/common/hash_table/hash_map_util.h"
-#include "vec/common/hash_table/partitioned_hash_map.h"
-#include "vec/common/hash_table/ph_hash_map.h"
-#include "vec/common/hash_table/string_hash_map.h"
-#include "vec/common/pod_array.h"
-#include "vec/common/string_ref.h"
-#include "vec/common/uint128.h"
-#include "vec/core/block.h"
-#include "vec/core/block_spill_reader.h"
-#include "vec/core/block_spill_writer.h"
-#include "vec/core/column_with_type_and_name.h"
-#include "vec/core/types.h"
-#include "vec/exprs/vectorized_agg_fn.h"
-#include "vec/exprs/vexpr.h"
-#include "vec/exprs/vexpr_context.h"
-#include "vec/exprs/vslot_ref.h"
-
-namespace doris {
-class TPlanNode;
-class DescriptorTbl;
-class ObjectPool;
-class RuntimeState;
-class TupleDescriptor;
-
-namespace pipeline {
-class AggSinkOperator;
-class AggSourceOperator;
-class StreamingAggSinkOperator;
-class StreamingAggSourceOperator;
-} // namespace pipeline
-
-namespace vectorized {
-
-using AggregatedDataWithoutKey = AggregateDataPtr;
-using AggregatedDataWithStringKey = PHHashMap<StringRef, AggregateDataPtr>;
-using AggregatedDataWithShortStringKey = StringHashMap<AggregateDataPtr>;
-using AggregatedDataWithUInt8Key = PHHashMap<UInt8, AggregateDataPtr>;
-using AggregatedDataWithUInt16Key = PHHashMap<UInt16, AggregateDataPtr>;
-using AggregatedDataWithUInt32Key = PHHashMap<UInt32, AggregateDataPtr,
HashCRC32<UInt32>>;
-using AggregatedDataWithUInt64Key = PHHashMap<UInt64, AggregateDataPtr,
HashCRC32<UInt64>>;
-using AggregatedDataWithUInt128Key = PHHashMap<UInt128, AggregateDataPtr,
HashCRC32<UInt128>>;
-using AggregatedDataWithUInt256Key = PHHashMap<UInt256, AggregateDataPtr,
HashCRC32<UInt256>>;
-using AggregatedDataWithUInt136Key = PHHashMap<UInt136, AggregateDataPtr,
HashCRC32<UInt136>>;
-
-using AggregatedDataWithUInt32KeyPhase2 =
- PHHashMap<UInt32, AggregateDataPtr, HashMixWrapper<UInt32>>;
-using AggregatedDataWithUInt64KeyPhase2 =
- PHHashMap<UInt64, AggregateDataPtr, HashMixWrapper<UInt64>>;
-using AggregatedDataWithUInt128KeyPhase2 =
- PHHashMap<UInt128, AggregateDataPtr, HashMixWrapper<UInt128>>;
-using AggregatedDataWithUInt256KeyPhase2 =
- PHHashMap<UInt256, AggregateDataPtr, HashMixWrapper<UInt256>>;
-
-using AggregatedDataWithUInt136KeyPhase2 =
- PHHashMap<UInt136, AggregateDataPtr, HashMixWrapper<UInt136>>;
-
-using AggregatedDataWithNullableUInt8Key =
DataWithNullKey<AggregatedDataWithUInt8Key>;
-using AggregatedDataWithNullableUInt16Key =
DataWithNullKey<AggregatedDataWithUInt16Key>;
-using AggregatedDataWithNullableUInt32Key =
DataWithNullKey<AggregatedDataWithUInt32Key>;
-using AggregatedDataWithNullableUInt64Key =
DataWithNullKey<AggregatedDataWithUInt64Key>;
-using AggregatedDataWithNullableUInt32KeyPhase2 =
- DataWithNullKey<AggregatedDataWithUInt32KeyPhase2>;
-using AggregatedDataWithNullableUInt64KeyPhase2 =
- DataWithNullKey<AggregatedDataWithUInt64KeyPhase2>;
-using AggregatedDataWithNullableShortStringKey =
DataWithNullKey<AggregatedDataWithShortStringKey>;
-using AggregatedDataWithNullableUInt128Key =
DataWithNullKey<AggregatedDataWithUInt128Key>;
-using AggregatedDataWithNullableUInt128KeyPhase2 =
- DataWithNullKey<AggregatedDataWithUInt128KeyPhase2>;
-
-using AggregatedMethodVariants = std::variant<
- std::monostate, MethodSerialized<AggregatedDataWithStringKey>,
- MethodOneNumber<UInt8, AggregatedDataWithUInt8Key>,
- MethodOneNumber<UInt16, AggregatedDataWithUInt16Key>,
- MethodOneNumber<UInt32, AggregatedDataWithUInt32Key>,
- MethodOneNumber<UInt64, AggregatedDataWithUInt64Key>,
- MethodStringNoCache<AggregatedDataWithShortStringKey>,
- MethodOneNumber<UInt128, AggregatedDataWithUInt128Key>,
- MethodOneNumber<UInt32, AggregatedDataWithUInt32KeyPhase2>,
- MethodOneNumber<UInt64, AggregatedDataWithUInt64KeyPhase2>,
- MethodOneNumber<UInt128, AggregatedDataWithUInt128KeyPhase2>,
- MethodSingleNullableColumn<MethodOneNumber<UInt8,
AggregatedDataWithNullableUInt8Key>>,
- MethodSingleNullableColumn<MethodOneNumber<UInt16,
AggregatedDataWithNullableUInt16Key>>,
- MethodSingleNullableColumn<MethodOneNumber<UInt32,
AggregatedDataWithNullableUInt32Key>>,
- MethodSingleNullableColumn<MethodOneNumber<UInt64,
AggregatedDataWithNullableUInt64Key>>,
- MethodSingleNullableColumn<
- MethodOneNumber<UInt32,
AggregatedDataWithNullableUInt32KeyPhase2>>,
- MethodSingleNullableColumn<
- MethodOneNumber<UInt64,
AggregatedDataWithNullableUInt64KeyPhase2>>,
- MethodSingleNullableColumn<MethodOneNumber<UInt128,
AggregatedDataWithNullableUInt128Key>>,
- MethodSingleNullableColumn<
- MethodOneNumber<UInt128,
AggregatedDataWithNullableUInt128KeyPhase2>>,
-
MethodSingleNullableColumn<MethodStringNoCache<AggregatedDataWithNullableShortStringKey>>,
- MethodKeysFixed<AggregatedDataWithUInt64Key, false>,
- MethodKeysFixed<AggregatedDataWithUInt64Key, true>,
- MethodKeysFixed<AggregatedDataWithUInt128Key, false>,
- MethodKeysFixed<AggregatedDataWithUInt128Key, true>,
- MethodKeysFixed<AggregatedDataWithUInt256Key, false>,
- MethodKeysFixed<AggregatedDataWithUInt256Key, true>,
- MethodKeysFixed<AggregatedDataWithUInt136Key, false>,
- MethodKeysFixed<AggregatedDataWithUInt136Key, true>,
- MethodKeysFixed<AggregatedDataWithUInt64KeyPhase2, false>,
- MethodKeysFixed<AggregatedDataWithUInt64KeyPhase2, true>,
- MethodKeysFixed<AggregatedDataWithUInt128KeyPhase2, false>,
- MethodKeysFixed<AggregatedDataWithUInt128KeyPhase2, true>,
- MethodKeysFixed<AggregatedDataWithUInt256KeyPhase2, false>,
- MethodKeysFixed<AggregatedDataWithUInt256KeyPhase2, true>,
- MethodKeysFixed<AggregatedDataWithUInt136KeyPhase2, false>,
- MethodKeysFixed<AggregatedDataWithUInt136KeyPhase2, true>>;
-
-struct AggregatedDataVariants
- : public DataVariants<AggregatedMethodVariants,
MethodSingleNullableColumn, MethodOneNumber,
- MethodKeysFixed, DataWithNullKey> {
- AggregatedDataWithoutKey without_key = nullptr;
-
- template <bool nullable>
- void init(Type type) {
- _type = type;
- switch (_type) {
- case Type::without_key:
- break;
- case Type::serialized:
-
method_variant.emplace<MethodSerialized<AggregatedDataWithStringKey>>();
- break;
- case Type::int8_key:
- emplace_single<UInt8, AggregatedDataWithUInt8Key, nullable>();
- break;
- case Type::int16_key:
- emplace_single<UInt16, AggregatedDataWithUInt16Key, nullable>();
- break;
- case Type::int32_key:
- emplace_single<UInt32, AggregatedDataWithUInt32Key, nullable>();
- break;
- case Type::int32_key_phase2:
- emplace_single<UInt32, AggregatedDataWithUInt32KeyPhase2,
nullable>();
- break;
- case Type::int64_key:
- emplace_single<UInt64, AggregatedDataWithUInt64Key, nullable>();
- break;
- case Type::int64_key_phase2:
- emplace_single<UInt64, AggregatedDataWithUInt64KeyPhase2,
nullable>();
- break;
- case Type::int128_key:
- emplace_single<UInt128, AggregatedDataWithUInt128Key, nullable>();
- break;
- case Type::int128_key_phase2:
- emplace_single<UInt128, AggregatedDataWithUInt128KeyPhase2,
nullable>();
- break;
- case Type::string_key:
- if (nullable) {
- method_variant.emplace<MethodSingleNullableColumn<
-
MethodStringNoCache<AggregatedDataWithNullableShortStringKey>>>();
- } else {
-
method_variant.emplace<MethodStringNoCache<AggregatedDataWithShortStringKey>>();
- }
- break;
- default:
- throw Exception(ErrorCode::INTERNAL_ERROR, "meet invalid key type,
type={}", type);
- }
- }
-
- void init(Type type, bool is_nullable = false) {
- if (is_nullable) {
- init<true>(type);
- } else {
- init<false>(type);
- }
- }
-};
-
-using AggregatedDataVariantsUPtr = std::unique_ptr<AggregatedDataVariants>;
-using ArenaUPtr = std::unique_ptr<Arena>;
-
-struct AggregateDataContainer {
-public:
- AggregateDataContainer(size_t size_of_key, size_t size_of_aggregate_states)
- : _size_of_key(size_of_key),
_size_of_aggregate_states(size_of_aggregate_states) {}
-
- int64_t memory_usage() const { return _arena_pool.size(); }
-
- template <typename KeyType>
- AggregateDataPtr append_data(const KeyType& key) {
- DCHECK_EQ(sizeof(KeyType), _size_of_key);
- // SUB_CONTAINER_CAPACITY should add a new sub container, and also
expand when it is zero
- if (UNLIKELY(_index_in_sub_container % SUB_CONTAINER_CAPACITY == 0)) {
- _expand();
- }
-
- *reinterpret_cast<KeyType*>(_current_keys) = key;
- auto aggregate_data = _current_agg_data;
- ++_total_count;
- ++_index_in_sub_container;
- _current_agg_data += _size_of_aggregate_states;
- _current_keys += _size_of_key;
- return aggregate_data;
- }
-
- template <typename Derived, bool IsConst>
- class IteratorBase {
- using Container =
- std::conditional_t<IsConst, const AggregateDataContainer,
AggregateDataContainer>;
-
- Container* container = nullptr;
- uint32_t index;
- uint32_t sub_container_index;
- uint32_t index_in_sub_container;
-
- friend class HashTable;
-
- public:
- IteratorBase() = default;
- IteratorBase(Container* container_, uint32_t index_)
- : container(container_), index(index_) {
- sub_container_index = index / SUB_CONTAINER_CAPACITY;
- index_in_sub_container = index - sub_container_index *
SUB_CONTAINER_CAPACITY;
- }
-
- bool operator==(const IteratorBase& rhs) const { return index ==
rhs.index; }
- bool operator!=(const IteratorBase& rhs) const { return index !=
rhs.index; }
-
- Derived& operator++() {
- index++;
- index_in_sub_container++;
- if (index_in_sub_container == SUB_CONTAINER_CAPACITY) {
- index_in_sub_container = 0;
- sub_container_index++;
- }
- return static_cast<Derived&>(*this);
- }
-
- template <typename KeyType>
- KeyType get_key() {
- DCHECK_EQ(sizeof(KeyType), container->_size_of_key);
- return
((KeyType*)(container->_key_containers[sub_container_index]))
- [index_in_sub_container];
- }
-
- AggregateDataPtr get_aggregate_data() {
- return &(container->_value_containers[sub_container_index]
-
[container->_size_of_aggregate_states *
- index_in_sub_container]);
- }
- };
-
- class Iterator : public IteratorBase<Iterator, false> {
- public:
- using IteratorBase<Iterator, false>::IteratorBase;
- };
-
- class ConstIterator : public IteratorBase<ConstIterator, true> {
- public:
- using IteratorBase<ConstIterator, true>::IteratorBase;
- };
-
- ConstIterator begin() const { return ConstIterator(this, 0); }
-
- ConstIterator cbegin() const { return begin(); }
-
- Iterator begin() { return Iterator(this, 0); }
-
- ConstIterator end() const { return ConstIterator(this, _total_count); }
- ConstIterator cend() const { return end(); }
- Iterator end() { return Iterator(this, _total_count); }
-
- void init_once() {
- if (_inited) {
- return;
- }
- _inited = true;
- iterator = begin();
- }
- Iterator iterator;
-
-private:
- void _expand() {
- _index_in_sub_container = 0;
- _current_keys = nullptr;
- _current_agg_data = nullptr;
- try {
- _current_keys = _arena_pool.alloc(_size_of_key *
SUB_CONTAINER_CAPACITY);
- _key_containers.emplace_back(_current_keys);
-
- _current_agg_data =
(AggregateDataPtr)_arena_pool.alloc(_size_of_aggregate_states *
-
SUB_CONTAINER_CAPACITY);
- _value_containers.emplace_back(_current_agg_data);
- } catch (...) {
- if (_current_keys) {
- _key_containers.pop_back();
- _current_keys = nullptr;
- }
- if (_current_agg_data) {
- _value_containers.pop_back();
- _current_agg_data = nullptr;
- }
- throw;
- }
- }
-
- static constexpr uint32_t SUB_CONTAINER_CAPACITY = 8192;
- Arena _arena_pool;
- std::vector<char*> _key_containers;
- std::vector<AggregateDataPtr> _value_containers;
- AggregateDataPtr _current_agg_data = nullptr;
- char* _current_keys = nullptr;
- size_t _size_of_key {};
- size_t _size_of_aggregate_states {};
- uint32_t _index_in_sub_container {};
- uint32_t _total_count {};
- bool _inited = false;
-};
-
-} // namespace vectorized
-
-constexpr auto init_agg_hash_method =
- init_hash_method<vectorized::AggregatedDataVariants,
vectorized::AggregateDataPtr>;
-
-} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]