Gabriel39 commented on code in PR #25140:
URL: https://github.com/apache/doris/pull/25140#discussion_r1351229718
##########
be/src/pipeline/exec/set_source_operator.h:
##########
@@ -52,5 +53,67 @@ class SetSourceOperator : public
SourceOperator<SetSourceOperatorBuilder<is_inte
Status open(RuntimeState* /*state*/) override { return Status::OK(); }
};
+template <bool is_intersect>
+class SetSourceOperatorX;
+
+template <bool is_intersect>
+class SetSourceLocalState final : public PipelineXLocalState<SetDependency> {
+public:
+ ENABLE_FACTORY_CREATOR(SetSourceLocalState);
+ using Base = PipelineXLocalState<SetDependency>;
+ using Parent = SetSourceOperatorX<is_intersect>;
+ SetSourceLocalState(RuntimeState* state, OperatorXBase* parent) :
Base(state, parent) {};
+
+ Status init(RuntimeState* state, LocalStateInfo& info) override {
+ _pull_timer = ADD_TIMER(profile(), "PullTime");
+ return Status::OK();
+ }
+
+private:
+ friend class SetSourceOperatorX<is_intersect>;
+ friend class OperatorX<SetSourceLocalState>;
+ RuntimeProfile::Counter* _pull_timer; // time to pull data
+};
+
+template <bool is_intersect>
+class SetSourceOperatorX final : public
OperatorX<SetSourceLocalState<is_intersect>> {
+public:
+ using Base = OperatorX<SetSourceLocalState<is_intersect>>;
+ // for non-delay tempalte instantiation
+ using OperatorXBase::id;
+ using typename Base::LocalState;
+
+ SetSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs)
+ : Base(pool, tnode, descs) {};
+ ~SetSourceOperatorX() override = default;
+
+ Dependency* wait_for_dependency(RuntimeState* state) override {
+ CREATE_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state);
+ return local_state._dependency->read_blocked_by();
+ }
+
+ [[nodiscard]] bool is_source() const override { return true; }
+
+ Status get_block(RuntimeState* state, vectorized::Block* block,
+ SourceState& source_state) override;
+
+private:
+ friend class SetSourceLocalState<is_intersect>;
+
+ void create_mutable_cols(SetSourceLocalState<is_intersect>& local_state,
+ vectorized::Block* output_block);
+
+ template <typename HashTableContext>
+ Status get_data_in_hashtable(SetSourceLocalState<is_intersect>&
local_state,
+ HashTableContext& hash_table_ctx,
vectorized::Block* output_block,
+ const int batch_size, SourceState&
source_state);
+
+ void add_result_columns(SetSourceLocalState<is_intersect>& local_state,
+ vectorized::RowRefListWithFlags& value, int&
block_size);
+
+ using Base::_conjuncts;
+ std::vector<vectorized::MutableColumnPtr> _mutable_cols;
Review Comment:
Mutable variables should be hold in local state instead of this global
operator.
##########
be/src/pipeline/exec/set_source_operator.h:
##########
@@ -52,5 +53,67 @@ class SetSourceOperator : public
SourceOperator<SetSourceOperatorBuilder<is_inte
Status open(RuntimeState* /*state*/) override { return Status::OK(); }
};
+template <bool is_intersect>
+class SetSourceOperatorX;
+
+template <bool is_intersect>
+class SetSourceLocalState final : public PipelineXLocalState<SetDependency> {
+public:
+ ENABLE_FACTORY_CREATOR(SetSourceLocalState);
+ using Base = PipelineXLocalState<SetDependency>;
+ using Parent = SetSourceOperatorX<is_intersect>;
+ SetSourceLocalState(RuntimeState* state, OperatorXBase* parent) :
Base(state, parent) {};
+
+ Status init(RuntimeState* state, LocalStateInfo& info) override {
+ _pull_timer = ADD_TIMER(profile(), "PullTime");
+ return Status::OK();
+ }
+
+private:
+ friend class SetSourceOperatorX<is_intersect>;
+ friend class OperatorX<SetSourceLocalState>;
+ RuntimeProfile::Counter* _pull_timer; // time to pull data
+};
+
+template <bool is_intersect>
+class SetSourceOperatorX final : public
OperatorX<SetSourceLocalState<is_intersect>> {
+public:
+ using Base = OperatorX<SetSourceLocalState<is_intersect>>;
+ // for non-delay tempalte instantiation
+ using OperatorXBase::id;
+ using typename Base::LocalState;
+
+ SetSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs)
+ : Base(pool, tnode, descs) {};
+ ~SetSourceOperatorX() override = default;
+
+ Dependency* wait_for_dependency(RuntimeState* state) override {
+ CREATE_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state);
+ return local_state._dependency->read_blocked_by();
+ }
+
+ [[nodiscard]] bool is_source() const override { return true; }
+
+ Status get_block(RuntimeState* state, vectorized::Block* block,
+ SourceState& source_state) override;
+
+private:
+ friend class SetSourceLocalState<is_intersect>;
+
+ void create_mutable_cols(SetSourceLocalState<is_intersect>& local_state,
Review Comment:
Private function should have a name with prefix `_`
##########
be/src/pipeline/exec/set_source_operator.cpp:
##########
@@ -48,4 +48,116 @@ template class SetSourceOperatorBuilder<false>;
template class SetSourceOperator<true>;
template class SetSourceOperator<false>;
+template <bool is_intersect>
+Status SetSourceOperatorX<is_intersect>::get_block(RuntimeState* state,
vectorized::Block* block,
+ SourceState& source_state) {
+ RETURN_IF_CANCELLED(state);
+ CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ SCOPED_TIMER(local_state._pull_timer);
Review Comment:
We should also count this time into `total_timer`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]