This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 738a1644ffd [refactor](operator) We dont need OperatorBuilder to 
instantiate Operator (#29133)
738a1644ffd is described below

commit 738a1644ffd5157069be121f4541894d624098c8
Author: zhiqiang <[email protected]>
AuthorDate: Fri Dec 29 13:54:54 2023 +0800

    [refactor](operator) We dont need OperatorBuilder to instantiate Operator 
(#29133)
---
 be/src/pipeline/exec/aggregation_sink_operator.h   |  2 +-
 be/src/pipeline/exec/aggregation_source_operator.h |  2 +-
 be/src/pipeline/exec/analytic_sink_operator.h      |  2 +-
 be/src/pipeline/exec/analytic_source_operator.h    |  2 +-
 be/src/pipeline/exec/assert_num_rows_operator.h    |  2 +-
 be/src/pipeline/exec/const_value_operator.h        |  2 +-
 be/src/pipeline/exec/datagen_operator.h            |  2 +-
 .../distinct_streaming_aggregation_sink_operator.h |  2 +-
 ...istinct_streaming_aggregation_source_operator.h |  2 +-
 be/src/pipeline/exec/empty_set_operator.h          |  2 +-
 be/src/pipeline/exec/exchange_sink_operator.h      |  2 +-
 be/src/pipeline/exec/exchange_source_operator.h    |  2 +-
 .../exec/group_commit_block_sink_operator.h        |  2 +-
 be/src/pipeline/exec/hashjoin_build_sink.h         |  2 +-
 be/src/pipeline/exec/hashjoin_probe_operator.h     |  2 +-
 be/src/pipeline/exec/multi_cast_data_stream_sink.h |  2 +-
 be/src/pipeline/exec/mysql_scan_operator.h         |  2 +-
 .../exec/nested_loop_join_build_operator.h         |  2 +-
 .../exec/nested_loop_join_probe_operator.h         |  2 +-
 be/src/pipeline/exec/olap_table_sink_operator.h    |  2 +-
 be/src/pipeline/exec/olap_table_sink_v2_operator.h |  2 +-
 be/src/pipeline/exec/operator.h                    | 48 +++++++++-------------
 .../pipeline/exec/partition_sort_sink_operator.h   |  2 +-
 .../pipeline/exec/partition_sort_source_operator.h |  3 +-
 be/src/pipeline/exec/repeat_operator.h             |  2 +-
 be/src/pipeline/exec/result_file_sink_operator.h   |  2 +-
 be/src/pipeline/exec/result_sink_operator.h        |  2 +-
 be/src/pipeline/exec/scan_operator.h               |  2 +-
 be/src/pipeline/exec/schema_scan_operator.h        |  2 +-
 be/src/pipeline/exec/select_operator.h             |  2 +-
 be/src/pipeline/exec/set_probe_sink_operator.cpp   |  3 +-
 be/src/pipeline/exec/set_probe_sink_operator.h     |  2 +-
 be/src/pipeline/exec/set_sink_operator.cpp         |  2 +-
 be/src/pipeline/exec/set_sink_operator.h           |  2 +-
 be/src/pipeline/exec/set_source_operator.cpp       |  2 +-
 be/src/pipeline/exec/set_source_operator.h         |  2 +-
 be/src/pipeline/exec/sort_sink_operator.h          |  2 +-
 be/src/pipeline/exec/sort_source_operator.h        |  2 +-
 .../exec/streaming_aggregation_sink_operator.h     |  2 +-
 .../exec/streaming_aggregation_source_operator.h   |  2 +-
 be/src/pipeline/exec/table_function_operator.h     |  2 +-
 be/src/pipeline/exec/table_sink_operator.h         |  2 +-
 be/src/pipeline/exec/union_sink_operator.h         |  2 +-
 be/src/pipeline/exec/union_source_operator.h       |  2 +-
 44 files changed, 63 insertions(+), 73 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 97be9dcd6a3..d02e9a0f52a 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -39,7 +39,7 @@ public:
     bool is_sink() const override { return true; }
 };
 
-class AggSinkOperator final : public StreamingOperator<AggSinkOperatorBuilder> 
{
+class AggSinkOperator final : public 
StreamingOperator<vectorized::AggregationNode> {
 public:
     AggSinkOperator(OperatorBuilderBase* operator_builder, ExecNode* node);
     bool can_write() override { return true; }
diff --git a/be/src/pipeline/exec/aggregation_source_operator.h 
b/be/src/pipeline/exec/aggregation_source_operator.h
index c30277cbec5..a7125c42ff6 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.h
+++ b/be/src/pipeline/exec/aggregation_source_operator.h
@@ -39,7 +39,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class AggSourceOperator final : public 
SourceOperator<AggSourceOperatorBuilder> {
+class AggSourceOperator final : public 
SourceOperator<vectorized::AggregationNode> {
 public:
     AggSourceOperator(OperatorBuilderBase*, ExecNode*);
     // if exec node split to: sink, source operator. the source operator
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h 
b/be/src/pipeline/exec/analytic_sink_operator.h
index 3e0eb85f76d..e99a0731b34 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -38,7 +38,7 @@ public:
     bool is_sink() const override { return true; }
 };
 
-class AnalyticSinkOperator final : public 
StreamingOperator<AnalyticSinkOperatorBuilder> {
+class AnalyticSinkOperator final : public 
StreamingOperator<vectorized::VAnalyticEvalNode> {
 public:
     AnalyticSinkOperator(OperatorBuilderBase* operator_builder, ExecNode* 
node);
 
diff --git a/be/src/pipeline/exec/analytic_source_operator.h 
b/be/src/pipeline/exec/analytic_source_operator.h
index b9674b908c6..ba7b00be906 100644
--- a/be/src/pipeline/exec/analytic_source_operator.h
+++ b/be/src/pipeline/exec/analytic_source_operator.h
@@ -39,7 +39,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class AnalyticSourceOperator final : public 
SourceOperator<AnalyticSourceOperatorBuilder> {
+class AnalyticSourceOperator final : public 
SourceOperator<vectorized::VAnalyticEvalNode> {
 public:
     AnalyticSourceOperator(OperatorBuilderBase*, ExecNode*);
 
diff --git a/be/src/pipeline/exec/assert_num_rows_operator.h 
b/be/src/pipeline/exec/assert_num_rows_operator.h
index bb5e65168b6..43036b597cf 100644
--- a/be/src/pipeline/exec/assert_num_rows_operator.h
+++ b/be/src/pipeline/exec/assert_num_rows_operator.h
@@ -33,7 +33,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class AssertNumRowsOperator final : public 
StreamingOperator<AssertNumRowsOperatorBuilder> {
+class AssertNumRowsOperator final : public 
StreamingOperator<vectorized::VAssertNumRowsNode> {
 public:
     AssertNumRowsOperator(OperatorBuilderBase* operator_builder, ExecNode* 
node)
             : StreamingOperator(operator_builder, node) {}
diff --git a/be/src/pipeline/exec/const_value_operator.h 
b/be/src/pipeline/exec/const_value_operator.h
index 120c6753ee8..e87edae494b 100644
--- a/be/src/pipeline/exec/const_value_operator.h
+++ b/be/src/pipeline/exec/const_value_operator.h
@@ -36,7 +36,7 @@ public:
     bool is_source() const override { return true; }
 };
 
-class ConstValueOperator final : public 
SourceOperator<ConstValueOperatorBuilder> {
+class ConstValueOperator final : public SourceOperator<vectorized::VUnionNode> 
{
 public:
     ConstValueOperator(OperatorBuilderBase* operator_builder, ExecNode* node)
             : SourceOperator(operator_builder, node) {}
diff --git a/be/src/pipeline/exec/datagen_operator.h 
b/be/src/pipeline/exec/datagen_operator.h
index 1b8ef2977f9..f34dfc61df0 100644
--- a/be/src/pipeline/exec/datagen_operator.h
+++ b/be/src/pipeline/exec/datagen_operator.h
@@ -38,7 +38,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class DataGenOperator : public SourceOperator<DataGenOperatorBuilder> {
+class DataGenOperator : public 
SourceOperator<vectorized::VDataGenFunctionScanNode> {
 public:
     DataGenOperator(OperatorBuilderBase* operator_builder, ExecNode* 
datagen_node);
 
diff --git 
a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h 
b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
index b30a8298727..d62178460ea 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
@@ -54,7 +54,7 @@ private:
 };
 
 class DistinctStreamingAggSinkOperator final
-        : public StreamingOperator<DistinctStreamingAggSinkOperatorBuilder> {
+        : public StreamingOperator<vectorized::DistinctAggregationNode> {
 public:
     DistinctStreamingAggSinkOperator(OperatorBuilderBase* operator_builder, 
ExecNode*,
                                      std::shared_ptr<DataQueue>);
diff --git 
a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h 
b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h
index 0b4851ad97d..81edf18567f 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h
@@ -51,7 +51,7 @@ private:
 };
 
 class DistinctStreamingAggSourceOperator final
-        : public SourceOperator<DistinctStreamingAggSourceOperatorBuilder> {
+        : public SourceOperator<vectorized::DistinctAggregationNode> {
 public:
     DistinctStreamingAggSourceOperator(OperatorBuilderBase*, ExecNode*, 
std::shared_ptr<DataQueue>);
     bool can_read() override;
diff --git a/be/src/pipeline/exec/empty_set_operator.h 
b/be/src/pipeline/exec/empty_set_operator.h
index 126f132dd98..4e2607cd4ce 100644
--- a/be/src/pipeline/exec/empty_set_operator.h
+++ b/be/src/pipeline/exec/empty_set_operator.h
@@ -37,7 +37,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class EmptySetSourceOperator final : public 
SourceOperator<EmptySetSourceOperatorBuilder> {
+class EmptySetSourceOperator final : public 
SourceOperator<vectorized::VEmptySetNode> {
 public:
     EmptySetSourceOperator(OperatorBuilderBase* operator_builder, ExecNode* 
empty_set_node);
     bool can_read() override { return true; }
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index a34c4f4b435..1ddfc5f6174 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -46,7 +46,7 @@ private:
 };
 
 // Now local exchange is not supported since VDataStreamRecvr is considered as 
a pipeline broker.
-class ExchangeSinkOperator final : public 
DataSinkOperator<ExchangeSinkOperatorBuilder> {
+class ExchangeSinkOperator final : public 
DataSinkOperator<vectorized::VDataStreamSender> {
 public:
     ExchangeSinkOperator(OperatorBuilderBase* operator_builder, DataSink* 
sink, int mult_cast_id);
     Status init(const TDataSink& tsink) override;
diff --git a/be/src/pipeline/exec/exchange_source_operator.h 
b/be/src/pipeline/exec/exchange_source_operator.h
index b621da38072..bbccfe52987 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -43,7 +43,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class ExchangeSourceOperator final : public 
SourceOperator<ExchangeSourceOperatorBuilder> {
+class ExchangeSourceOperator final : public 
SourceOperator<vectorized::VExchangeNode> {
 public:
     ExchangeSourceOperator(OperatorBuilderBase*, ExecNode*);
     bool can_read() override;
diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.h 
b/be/src/pipeline/exec/group_commit_block_sink_operator.h
index 0cf36818db1..ad9198f4399 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.h
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.h
@@ -34,7 +34,7 @@ public:
 };
 
 class GroupCommitBlockSinkOperator final
-        : public DataSinkOperator<GroupCommitBlockSinkOperatorBuilder> {
+        : public DataSinkOperator<vectorized::GroupCommitBlockSink> {
 public:
     GroupCommitBlockSinkOperator(OperatorBuilderBase* operator_builder, 
DataSink* sink)
             : DataSinkOperator(operator_builder, sink) {}
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index b5ae146c182..fa4635afad1 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -37,7 +37,7 @@ public:
     bool is_sink() const override { return true; }
 };
 
-class HashJoinBuildSink final : public 
StreamingOperator<HashJoinBuildSinkBuilder> {
+class HashJoinBuildSink final : public 
StreamingOperator<vectorized::HashJoinNode> {
 public:
     HashJoinBuildSink(OperatorBuilderBase* operator_builder, ExecNode* node);
     bool can_write() override { return _node->can_sink_write(); }
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 5dde597ec76..cdb1bc05c93 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -36,7 +36,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class HashJoinProbeOperator final : public 
StatefulOperator<HashJoinProbeOperatorBuilder> {
+class HashJoinProbeOperator final : public 
StatefulOperator<vectorized::HashJoinNode> {
 public:
     HashJoinProbeOperator(OperatorBuilderBase*, ExecNode*);
     // if exec node split to: sink, source operator. the source operator
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h 
b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
index a30361ebe14..965605fd3c7 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
@@ -33,7 +33,7 @@ public:
 };
 
 class MultiCastDataStreamSinkOperator final
-        : public DataSinkOperator<MultiCastDataStreamSinkOperatorBuilder> {
+        : public DataSinkOperator<vectorized::MultiCastDataStreamSink> {
 public:
     MultiCastDataStreamSinkOperator(OperatorBuilderBase* operator_builder, 
DataSink* sink)
             : DataSinkOperator(operator_builder, sink) {}
diff --git a/be/src/pipeline/exec/mysql_scan_operator.h 
b/be/src/pipeline/exec/mysql_scan_operator.h
index 3c5714a7f4e..6e21d8d2ebe 100644
--- a/be/src/pipeline/exec/mysql_scan_operator.h
+++ b/be/src/pipeline/exec/mysql_scan_operator.h
@@ -29,7 +29,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class MysqlScanOperator : public SourceOperator<MysqlScanOperatorBuilder> {
+class MysqlScanOperator : public SourceOperator<vectorized::VMysqlScanNode> {
 public:
     MysqlScanOperator(OperatorBuilderBase* operator_builder, ExecNode* 
mysql_scan_node);
 
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h 
b/be/src/pipeline/exec/nested_loop_join_build_operator.h
index ea0820253cc..27f847cc00f 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h
@@ -38,7 +38,7 @@ public:
     bool is_sink() const override { return true; }
 };
 
-class NestLoopJoinBuildOperator final : public 
StreamingOperator<NestLoopJoinBuildOperatorBuilder> {
+class NestLoopJoinBuildOperator final : public 
StreamingOperator<vectorized::VNestedLoopJoinNode> {
 public:
     NestLoopJoinBuildOperator(OperatorBuilderBase* operator_builder, ExecNode* 
node);
     bool can_write() override { return true; }
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h 
b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
index 525640beb48..1a3a0e9f9a1 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
@@ -40,7 +40,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class NestLoopJoinProbeOperator final : public 
StatefulOperator<NestLoopJoinProbeOperatorBuilder> {
+class NestLoopJoinProbeOperator final : public 
StatefulOperator<vectorized::VNestedLoopJoinNode> {
 public:
     NestLoopJoinProbeOperator(OperatorBuilderBase* operator_builder, ExecNode* 
node);
 
diff --git a/be/src/pipeline/exec/olap_table_sink_operator.h 
b/be/src/pipeline/exec/olap_table_sink_operator.h
index 55fea64c634..6aa80776a3a 100644
--- a/be/src/pipeline/exec/olap_table_sink_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_operator.h
@@ -34,7 +34,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class OlapTableSinkOperator final : public 
DataSinkOperator<OlapTableSinkOperatorBuilder> {
+class OlapTableSinkOperator final : public 
DataSinkOperator<vectorized::VOlapTableSink> {
 public:
     OlapTableSinkOperator(OperatorBuilderBase* operator_builder, DataSink* 
sink)
             : DataSinkOperator(operator_builder, sink) {}
diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.h 
b/be/src/pipeline/exec/olap_table_sink_v2_operator.h
index 5c8cce62b8b..72b6a15e733 100644
--- a/be/src/pipeline/exec/olap_table_sink_v2_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_v2_operator.h
@@ -34,7 +34,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class OlapTableSinkV2Operator final : public 
DataSinkOperator<OlapTableSinkV2OperatorBuilder> {
+class OlapTableSinkV2Operator final : public 
DataSinkOperator<vectorized::VOlapTableSinkV2> {
 public:
     OlapTableSinkV2Operator(OperatorBuilderBase* operator_builder, DataSink* 
sink)
             : DataSinkOperator(operator_builder, sink) {}
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 7ff91fba695..bb0b8f30911 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -275,18 +275,17 @@ protected:
 };
 
 /**
- * All operators inherited from DataSinkOperator will hold a SinkNode inside. 
Namely, it is a one-to-one relation between DataSinkOperator and DataSink.
+ * All operators inherited from DataSinkOperator will hold a SinkNode inside.
+ * Namely, it is a one-to-one relation between DataSinkOperator and DataSink.
  *
- * It should be mentioned that, not all SinkOperators are inherited from this 
(e.g. SortSinkOperator which holds a sort node inside instead of a DataSink).
+ * It should be mentioned that, not all SinkOperators are inherited from this
+ * (e.g. SortSinkOperator which holds a sort node inside instead of a 
DataSink).
  */
-template <typename OperatorBuilderType>
+template <typename DataSinkType>
 class DataSinkOperator : public OperatorBase {
 public:
-    using NodeType =
-            
std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
-
     DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink)
-            : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) 
{}
+            : OperatorBase(builder), 
_sink(reinterpret_cast<DataSinkType*>(sink)) {}
 
     ~DataSinkOperator() override = default;
 
@@ -323,20 +322,17 @@ public:
     }
 
 protected:
-    NodeType* _sink = nullptr;
+    DataSinkType* _sink = nullptr;
 };
 
 /**
  * All operators inherited from Operator will hold a ExecNode inside.
  */
-template <typename OperatorBuilderType>
+template <typename StreamingNodeType>
 class StreamingOperator : public OperatorBase {
 public:
-    using NodeType =
-            
std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
-
     StreamingOperator(OperatorBuilderBase* builder, ExecNode* node)
-            : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) 
{}
+            : OperatorBase(builder), 
_node(reinterpret_cast<StreamingNodeType*>(node)) {}
 
     ~StreamingOperator() override = default;
 
@@ -400,24 +396,21 @@ public:
     }
 
 protected:
-    NodeType* _node = nullptr;
+    StreamingNodeType* _node = nullptr;
     bool _use_projection;
 };
 
-template <typename OperatorBuilderType>
-class SourceOperator : public StreamingOperator<OperatorBuilderType> {
+template <typename SourceNodeType>
+class SourceOperator : public StreamingOperator<SourceNodeType> {
 public:
-    using NodeType =
-            
std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
-
     SourceOperator(OperatorBuilderBase* builder, ExecNode* node)
-            : StreamingOperator<OperatorBuilderType>(builder, node) {}
+            : StreamingOperator<SourceNodeType>(builder, node) {}
 
     ~SourceOperator() override = default;
 
     Status get_block(RuntimeState* state, vectorized::Block* block,
                      SourceState& source_state) override {
-        auto& node = StreamingOperator<OperatorBuilderType>::_node;
+        auto& node = StreamingOperator<SourceNodeType>::_node;
         bool eos = false;
         RETURN_IF_ERROR(node->get_next_after_projects(
                 state, block, &eos,
@@ -436,14 +429,11 @@ public:
  * If there are still remain rows in probe block, we can get output block by 
calling `get_block` without any data from its child.
  * In a nutshell, it is a one-to-many relation between input blocks and output 
blocks for StatefulOperator.
  */
-template <typename OperatorBuilderType>
-class StatefulOperator : public StreamingOperator<OperatorBuilderType> {
+template <typename StatefulNodeType>
+class StatefulOperator : public StreamingOperator<StatefulNodeType> {
 public:
-    using NodeType =
-            
std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>;
-
     StatefulOperator(OperatorBuilderBase* builder, ExecNode* node)
-            : StreamingOperator<OperatorBuilderType>(builder, node),
+            : StreamingOperator<StatefulNodeType>(builder, node),
               _child_block(vectorized::Block::create_shared()),
               _child_source_state(SourceState::DEPEND_ON_SOURCE) {}
 
@@ -451,8 +441,8 @@ public:
 
     Status get_block(RuntimeState* state, vectorized::Block* block,
                      SourceState& source_state) override {
-        auto& node = StreamingOperator<OperatorBuilderType>::_node;
-        auto& child = StreamingOperator<OperatorBuilderType>::_child;
+        auto& node = StreamingOperator<StatefulNodeType>::_node;
+        auto& child = StreamingOperator<StatefulNodeType>::_child;
 
         if (node->need_more_input_data()) {
             _child_block->clear_column_data();
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h 
b/be/src/pipeline/exec/partition_sort_sink_operator.h
index 486e7056213..a46dc46433c 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.h
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.h
@@ -42,7 +42,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class PartitionSortSinkOperator final : public 
StreamingOperator<PartitionSortSinkOperatorBuilder> {
+class PartitionSortSinkOperator final : public 
StreamingOperator<vectorized::VPartitionSortNode> {
 public:
     PartitionSortSinkOperator(OperatorBuilderBase* operator_builder, ExecNode* 
sort_node)
             : StreamingOperator(operator_builder, sort_node) {};
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.h 
b/be/src/pipeline/exec/partition_sort_source_operator.h
index 0b955645532..d0ae26c7c4a 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.h
+++ b/be/src/pipeline/exec/partition_sort_source_operator.h
@@ -41,8 +41,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class PartitionSortSourceOperator final
-        : public SourceOperator<PartitionSortSourceOperatorBuilder> {
+class PartitionSortSourceOperator final : public 
SourceOperator<vectorized::VPartitionSortNode> {
 public:
     PartitionSortSourceOperator(OperatorBuilderBase* operator_builder, 
ExecNode* sort_node)
             : SourceOperator(operator_builder, sort_node) {}
diff --git a/be/src/pipeline/exec/repeat_operator.h 
b/be/src/pipeline/exec/repeat_operator.h
index 03196a22bdc..00555ce9dcf 100644
--- a/be/src/pipeline/exec/repeat_operator.h
+++ b/be/src/pipeline/exec/repeat_operator.h
@@ -36,7 +36,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class RepeatOperator final : public StatefulOperator<RepeatOperatorBuilder> {
+class RepeatOperator final : public StatefulOperator<vectorized::VRepeatNode> {
 public:
     RepeatOperator(OperatorBuilderBase* operator_builder, ExecNode* 
repeat_node);
 
diff --git a/be/src/pipeline/exec/result_file_sink_operator.h 
b/be/src/pipeline/exec/result_file_sink_operator.h
index 69235ee6707..f064b8f2b7f 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.h
+++ b/be/src/pipeline/exec/result_file_sink_operator.h
@@ -36,7 +36,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class ResultFileSinkOperator final : public 
DataSinkOperator<ResultFileSinkOperatorBuilder> {
+class ResultFileSinkOperator final : public 
DataSinkOperator<vectorized::VResultFileSink> {
 public:
     ResultFileSinkOperator(OperatorBuilderBase* operator_builder, DataSink* 
sink);
 
diff --git a/be/src/pipeline/exec/result_sink_operator.h 
b/be/src/pipeline/exec/result_sink_operator.h
index bed6aed8969..7a66738e4b4 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -37,7 +37,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class ResultSinkOperator final : public 
DataSinkOperator<ResultSinkOperatorBuilder> {
+class ResultSinkOperator final : public 
DataSinkOperator<vectorized::VResultSink> {
 public:
     ResultSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink);
 
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index 833ca55f095..3ad1ac1536a 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -43,7 +43,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class ScanOperator : public SourceOperator<ScanOperatorBuilder> {
+class ScanOperator : public SourceOperator<vectorized::VScanNode> {
 public:
     ScanOperator(OperatorBuilderBase* operator_builder, ExecNode* scan_node);
 
diff --git a/be/src/pipeline/exec/schema_scan_operator.h 
b/be/src/pipeline/exec/schema_scan_operator.h
index fdd3cb59312..2e00a2cbd4f 100644
--- a/be/src/pipeline/exec/schema_scan_operator.h
+++ b/be/src/pipeline/exec/schema_scan_operator.h
@@ -38,7 +38,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class SchemaScanOperator : public SourceOperator<SchemaScanOperatorBuilder> {
+class SchemaScanOperator : public SourceOperator<vectorized::VSchemaScanNode> {
 public:
     SchemaScanOperator(OperatorBuilderBase* operator_builder, ExecNode* 
scan_node);
 
diff --git a/be/src/pipeline/exec/select_operator.h 
b/be/src/pipeline/exec/select_operator.h
index d75c0a78481..84a66528875 100644
--- a/be/src/pipeline/exec/select_operator.h
+++ b/be/src/pipeline/exec/select_operator.h
@@ -35,7 +35,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class SelectOperator final : public StreamingOperator<SelectOperatorBuilder> {
+class SelectOperator final : public StreamingOperator<vectorized::VSelectNode> 
{
 public:
     SelectOperator(OperatorBuilderBase* operator_builder, ExecNode* 
select_node);
 };
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp 
b/be/src/pipeline/exec/set_probe_sink_operator.cpp
index f1615df60da..440936d5951 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp
@@ -50,7 +50,8 @@ OperatorPtr 
SetProbeSinkOperatorBuilder<is_intersect>::build_operator() {
 template <bool is_intersect>
 SetProbeSinkOperator<is_intersect>::SetProbeSinkOperator(OperatorBuilderBase* 
operator_builder,
                                                          int child_id, 
ExecNode* set_node)
-        : 
StreamingOperator<SetProbeSinkOperatorBuilder<is_intersect>>(operator_builder, 
set_node),
+        : 
StreamingOperator<vectorized::VSetOperationNode<is_intersect>>(operator_builder,
+                                                                         
set_node),
           _child_id(child_id) {}
 
 template <bool is_intersect>
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h 
b/be/src/pipeline/exec/set_probe_sink_operator.h
index 6f453ff31fc..6ac09ccfd9d 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -54,7 +54,7 @@ private:
 };
 
 template <bool is_intersect>
-class SetProbeSinkOperator : public 
StreamingOperator<SetProbeSinkOperatorBuilder<is_intersect>> {
+class SetProbeSinkOperator : public 
StreamingOperator<vectorized::VSetOperationNode<is_intersect>> {
 public:
     SetProbeSinkOperator(OperatorBuilderBase* operator_builder, int child_id, 
ExecNode* set_node);
 
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp 
b/be/src/pipeline/exec/set_sink_operator.cpp
index d46c6922322..367ff41acce 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -42,7 +42,7 @@ OperatorPtr 
SetSinkOperatorBuilder<is_intersect>::build_operator() {
 template <bool is_intersect>
 SetSinkOperator<is_intersect>::SetSinkOperator(
         OperatorBuilderBase* builder, 
vectorized::VSetOperationNode<is_intersect>* set_node)
-        : StreamingOperator<SetSinkOperatorBuilder<is_intersect>>(builder, 
set_node) {}
+        : 
StreamingOperator<vectorized::VSetOperationNode<is_intersect>>(builder, 
set_node) {}
 
 template class SetSinkOperatorBuilder<true>;
 template class SetSinkOperatorBuilder<false>;
diff --git a/be/src/pipeline/exec/set_sink_operator.h 
b/be/src/pipeline/exec/set_sink_operator.h
index 635d1ee8675..b851e18ce5b 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -49,7 +49,7 @@ public:
 };
 
 template <bool is_intersect>
-class SetSinkOperator : public 
StreamingOperator<SetSinkOperatorBuilder<is_intersect>> {
+class SetSinkOperator : public 
StreamingOperator<vectorized::VSetOperationNode<is_intersect>> {
 public:
     SetSinkOperator(OperatorBuilderBase* operator_builder,
                     vectorized::VSetOperationNode<is_intersect>* set_node);
diff --git a/be/src/pipeline/exec/set_source_operator.cpp 
b/be/src/pipeline/exec/set_source_operator.cpp
index e8a73c00ad6..e74e8b22ec0 100644
--- a/be/src/pipeline/exec/set_source_operator.cpp
+++ b/be/src/pipeline/exec/set_source_operator.cpp
@@ -42,7 +42,7 @@ OperatorPtr 
SetSourceOperatorBuilder<is_intersect>::build_operator() {
 template <bool is_intersect>
 SetSourceOperator<is_intersect>::SetSourceOperator(
         OperatorBuilderBase* builder, 
vectorized::VSetOperationNode<is_intersect>* set_node)
-        : SourceOperator<SetSourceOperatorBuilder<is_intersect>>(builder, 
set_node) {}
+        : SourceOperator<vectorized::VSetOperationNode<is_intersect>>(builder, 
set_node) {}
 
 template class SetSourceOperatorBuilder<true>;
 template class SetSourceOperatorBuilder<false>;
diff --git a/be/src/pipeline/exec/set_source_operator.h 
b/be/src/pipeline/exec/set_source_operator.h
index 44800f23f41..caf3e447f56 100644
--- a/be/src/pipeline/exec/set_source_operator.h
+++ b/be/src/pipeline/exec/set_source_operator.h
@@ -45,7 +45,7 @@ public:
 };
 
 template <bool is_intersect>
-class SetSourceOperator : public 
SourceOperator<SetSourceOperatorBuilder<is_intersect>> {
+class SetSourceOperator : public 
SourceOperator<vectorized::VSetOperationNode<is_intersect>> {
 public:
     SetSourceOperator(OperatorBuilderBase* builder,
                       vectorized::VSetOperationNode<is_intersect>* set_node);
diff --git a/be/src/pipeline/exec/sort_sink_operator.h 
b/be/src/pipeline/exec/sort_sink_operator.h
index 2f5512e108b..64beb53ba9e 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -38,7 +38,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class SortSinkOperator final : public 
StreamingOperator<SortSinkOperatorBuilder> {
+class SortSinkOperator final : public StreamingOperator<vectorized::VSortNode> 
{
 public:
     SortSinkOperator(OperatorBuilderBase* operator_builder, ExecNode* 
sort_node);
 
diff --git a/be/src/pipeline/exec/sort_source_operator.h 
b/be/src/pipeline/exec/sort_source_operator.h
index efbddfd1050..216c6290700 100644
--- a/be/src/pipeline/exec/sort_source_operator.h
+++ b/be/src/pipeline/exec/sort_source_operator.h
@@ -39,7 +39,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class SortSourceOperator final : public 
SourceOperator<SortSourceOperatorBuilder> {
+class SortSourceOperator final : public SourceOperator<vectorized::VSortNode> {
 public:
     SortSourceOperator(OperatorBuilderBase* operator_builder, ExecNode* 
sort_node);
     Status open(RuntimeState*) override { return Status::OK(); }
diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h 
b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
index a7fcdcf847b..16f28bbeb76 100644
--- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
@@ -50,7 +50,7 @@ private:
     std::shared_ptr<DataQueue> _data_queue;
 };
 
-class StreamingAggSinkOperator final : public 
StreamingOperator<StreamingAggSinkOperatorBuilder> {
+class StreamingAggSinkOperator final : public 
StreamingOperator<vectorized::AggregationNode> {
 public:
     StreamingAggSinkOperator(OperatorBuilderBase* operator_builder, ExecNode*,
                              std::shared_ptr<DataQueue>);
diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.h 
b/be/src/pipeline/exec/streaming_aggregation_source_operator.h
index 3e05955470d..51e3c9ed972 100644
--- a/be/src/pipeline/exec/streaming_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.h
@@ -49,7 +49,7 @@ private:
     std::shared_ptr<DataQueue> _data_queue;
 };
 
-class StreamingAggSourceOperator final : public 
SourceOperator<StreamingAggSourceOperatorBuilder> {
+class StreamingAggSourceOperator final : public 
SourceOperator<vectorized::AggregationNode> {
 public:
     StreamingAggSourceOperator(OperatorBuilderBase*, ExecNode*, 
std::shared_ptr<DataQueue>);
     bool can_read() override;
diff --git a/be/src/pipeline/exec/table_function_operator.h 
b/be/src/pipeline/exec/table_function_operator.h
index 2198f336c8e..a6c90e3dec4 100644
--- a/be/src/pipeline/exec/table_function_operator.h
+++ b/be/src/pipeline/exec/table_function_operator.h
@@ -38,7 +38,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class TableFunctionOperator final : public 
StatefulOperator<TableFunctionOperatorBuilder> {
+class TableFunctionOperator final : public 
StatefulOperator<vectorized::VTableFunctionNode> {
 public:
     TableFunctionOperator(OperatorBuilderBase* operator_builder, ExecNode* 
node);
 
diff --git a/be/src/pipeline/exec/table_sink_operator.h 
b/be/src/pipeline/exec/table_sink_operator.h
index 429c350eda5..46843c23f87 100644
--- a/be/src/pipeline/exec/table_sink_operator.h
+++ b/be/src/pipeline/exec/table_sink_operator.h
@@ -33,7 +33,7 @@ public:
     OperatorPtr build_operator() override;
 };
 
-class TableSinkOperator final : public 
DataSinkOperator<TableSinkOperatorBuilder> {
+class TableSinkOperator final : public DataSinkOperator<DataSink> {
 public:
     TableSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink)
             : DataSinkOperator(operator_builder, sink) {}
diff --git a/be/src/pipeline/exec/union_sink_operator.h 
b/be/src/pipeline/exec/union_sink_operator.h
index f3031dd1015..ed51e8b7398 100644
--- a/be/src/pipeline/exec/union_sink_operator.h
+++ b/be/src/pipeline/exec/union_sink_operator.h
@@ -48,7 +48,7 @@ private:
     std::shared_ptr<DataQueue> _data_queue;
 };
 
-class UnionSinkOperator final : public 
StreamingOperator<UnionSinkOperatorBuilder> {
+class UnionSinkOperator final : public 
StreamingOperator<vectorized::VUnionNode> {
 public:
     UnionSinkOperator(OperatorBuilderBase* operator_builder, int child_id, 
ExecNode* node,
                       std::shared_ptr<DataQueue> queue);
diff --git a/be/src/pipeline/exec/union_source_operator.h 
b/be/src/pipeline/exec/union_source_operator.h
index 8d1b25bc4c4..0c150a072b6 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -48,7 +48,7 @@ private:
     std::shared_ptr<DataQueue> _data_queue;
 };
 
-class UnionSourceOperator final : public 
SourceOperator<UnionSourceOperatorBuilder> {
+class UnionSourceOperator final : public 
SourceOperator<vectorized::VUnionNode> {
 public:
     UnionSourceOperator(OperatorBuilderBase* operator_builder, ExecNode* node,
                         std::shared_ptr<DataQueue>);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to