Gabriel39 commented on code in PR #24286:
URL: https://github.com/apache/doris/pull/24286#discussion_r1324014187


##########
be/src/pipeline/exec/union_sink_operator.h:
##########
@@ -64,5 +65,121 @@ class UnionSinkOperator final : public 
StreamingOperator<UnionSinkOperatorBuilde
     std::shared_ptr<DataQueue> _data_queue;
     std::unique_ptr<vectorized::Block> _output_block;
 };
+
+class UnionSinkOperatorX;
+class UnionSinkLocalState final : public 
PipelineXSinkLocalState<UnionDependency> {
+public:
+    ENABLE_FACTORY_CREATOR(UnionSinkLocalState);
+    UnionSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
+            : Base(parent, state), _child_idx(0), _child_row_idx(0) {}
+    Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+    friend class UnionSinkOperatorX;
+    using Base = PipelineXSinkLocalState<UnionDependency>;
+    using Parent = UnionSinkOperatorX;
+    void set(std::shared_ptr<DataQueue> que) { _shared_state->_data_queue = 
que; }

Review Comment:
   Use another function name with a more clear meaning instead of `set`



##########
be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h:
##########
@@ -156,6 +156,7 @@ class PipelineXFragmentContext : public 
PipelineFragmentContext {
 
     std::map<UniqueId, RuntimeState*> _instance_id_to_runtime_state;
     std::mutex _state_map_lock;
+    std::map<int, std::vector<PipelinePtr>> _union_child_pipelines;

Review Comment:
   `_union_child_pipelines` has the similar function to 
`_build_side_pipelines`. Can you unify those two?



##########
be/src/pipeline/exec/union_sink_operator.cpp:
##########
@@ -94,4 +94,94 @@ Status UnionSinkOperator::close(RuntimeState* state) {
     return StreamingOperator::close(state);
 }
 
+Status UnionSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& 
info) {
+    Base::init(state, info);
+    auto& p = _parent->cast<Parent>();
+    auto copy_expr_ctxs = [&](auto& _expr_ctxs, auto& _oth_expr_ctxs) {
+        _expr_ctxs.resize(_oth_expr_ctxs.size());
+        for (size_t i = 0; i < _expr_ctxs.size(); i++) {
+            RETURN_IF_ERROR(_oth_expr_ctxs[i]->clone(state, _expr_ctxs[i]));
+        }
+        return Status::OK();
+    };
+    // copy_expr_ctxs(_const_expr,p._const_expr);
+    copy_expr_ctxs(_child_expr, p._child_expr);

Review Comment:
   why use lambda?



##########
be/src/pipeline/exec/union_sink_operator.h:
##########
@@ -64,5 +65,121 @@ class UnionSinkOperator final : public 
StreamingOperator<UnionSinkOperatorBuilde
     std::shared_ptr<DataQueue> _data_queue;
     std::unique_ptr<vectorized::Block> _output_block;
 };
+
+class UnionSinkOperatorX;
+class UnionSinkLocalState final : public 
PipelineXSinkLocalState<UnionDependency> {
+public:
+    ENABLE_FACTORY_CREATOR(UnionSinkLocalState);
+    UnionSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
+            : Base(parent, state), _child_idx(0), _child_row_idx(0) {}
+    Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+    friend class UnionSinkOperatorX;
+    using Base = PipelineXSinkLocalState<UnionDependency>;
+    using Parent = UnionSinkOperatorX;
+    void set(std::shared_ptr<DataQueue> que) { _shared_state->_data_queue = 
que; }
+
+private:
+    std::unique_ptr<vectorized::Block> _output_block;
+
+    /// Const exprs materialized by this node. These exprs don't refer to any 
children.
+    /// Only materialized by the first fragment instance to avoid duplication.
+    vectorized::VExprContextSPtrs _const_expr;
+
+    /// Exprs materialized by this node. The i-th result expr list refers to 
the i-th child.
+    vectorized::VExprContextSPtrs _child_expr;
+
+    /// Index of current child.
+    [[maybe_unused]] int _child_idx;

Review Comment:
   `_child_idx` is always not used?



##########
be/src/pipeline/exec/union_sink_operator.cpp:
##########
@@ -94,4 +94,94 @@ Status UnionSinkOperator::close(RuntimeState* state) {
     return StreamingOperator::close(state);
 }
 
+Status UnionSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& 
info) {
+    Base::init(state, info);
+    auto& p = _parent->cast<Parent>();
+    auto copy_expr_ctxs = [&](auto& _expr_ctxs, auto& _oth_expr_ctxs) {
+        _expr_ctxs.resize(_oth_expr_ctxs.size());
+        for (size_t i = 0; i < _expr_ctxs.size(); i++) {
+            RETURN_IF_ERROR(_oth_expr_ctxs[i]->clone(state, _expr_ctxs[i]));
+        }
+        return Status::OK();
+    };
+    // copy_expr_ctxs(_const_expr,p._const_expr);

Review Comment:
   delete this line



##########
be/src/pipeline/pipeline_x/dependency.h:
##########
@@ -157,6 +157,21 @@ class SortDependency final : public Dependency {
     SortSharedState _sort_state;
 };
 
+struct UnionSharedState {
+public:
+    std::shared_ptr<DataQueue> _data_queue = std::make_shared<DataQueue>(3);

Review Comment:
   Why hard code `3`?



##########
be/src/pipeline/exec/union_source_operator.cpp:
##########
@@ -97,5 +97,47 @@ Status UnionSourceOperator::get_block(RuntimeState* state, 
vectorized::Block* bl
 
     return Status::OK();
 }
+
+Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
+    RETURN_IF_ERROR(Base::init(state, info));
+    auto& p = _parent->cast<Parent>();
+    std::shared_ptr<DataQueue> data_queue = 
std::make_shared<DataQueue>(p._child_size);
+    _shared_state->_data_queue.swap(data_queue);
+    return Status::OK();
+}
+
+Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* 
block,
+                                       SourceState& source_state) {
+    auto& local_state = 
state->get_local_state(id())->cast<UnionSourceLocalState>();
+    bool eos = false;
+    pull_data(state, block, &eos);
+    //have exectue const expr, queue have no data any more, and child could be 
colsed
+    if ((!_has_data(state) && 
local_state._shared_state->_data_queue->is_all_finish())) {
+        source_state = SourceState::FINISHED;
+    } else if (_has_data(state)) {
+        source_state = SourceState::MORE_DATA;
+    } else {
+        source_state = SourceState::DEPEND_ON_SOURCE;
+    }
+    return Status::OK();
+}
+
+Status UnionSourceOperatorX::pull_data(RuntimeState* state, vectorized::Block* 
block, bool* eos) {

Review Comment:
   Do not need use a function. Move this into `get_block`



##########
be/src/pipeline/exec/union_sink_operator.h:
##########
@@ -64,5 +65,121 @@ class UnionSinkOperator final : public 
StreamingOperator<UnionSinkOperatorBuilde
     std::shared_ptr<DataQueue> _data_queue;
     std::unique_ptr<vectorized::Block> _output_block;
 };
+
+class UnionSinkOperatorX;
+class UnionSinkLocalState final : public 
PipelineXSinkLocalState<UnionDependency> {
+public:
+    ENABLE_FACTORY_CREATOR(UnionSinkLocalState);
+    UnionSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
+            : Base(parent, state), _child_idx(0), _child_row_idx(0) {}
+    Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+    friend class UnionSinkOperatorX;
+    using Base = PipelineXSinkLocalState<UnionDependency>;
+    using Parent = UnionSinkOperatorX;
+    void set(std::shared_ptr<DataQueue> que) { _shared_state->_data_queue = 
que; }
+
+private:
+    std::unique_ptr<vectorized::Block> _output_block;
+
+    /// Const exprs materialized by this node. These exprs don't refer to any 
children.
+    /// Only materialized by the first fragment instance to avoid duplication.
+    vectorized::VExprContextSPtrs _const_expr;
+
+    /// Exprs materialized by this node. The i-th result expr list refers to 
the i-th child.
+    vectorized::VExprContextSPtrs _child_expr;
+
+    /// Index of current child.
+    [[maybe_unused]] int _child_idx;
+
+    /// Index of current row in child_row_block_.
+    [[maybe_unused]] int _child_row_idx;
+};
+
+class UnionSinkOperatorX final : public DataSinkOperatorX<UnionSinkLocalState> 
{
+public:
+    using Base = DataSinkOperatorX<UnionSinkLocalState>;
+
+    friend class UnionSinkLocalState;
+    UnionSinkOperatorX(int child_id, int sink_id, ObjectPool* pool, const 
TPlanNode& tnode,
+                       const DescriptorTbl& descs);
+    ~UnionSinkOperatorX() override = default;
+    Status init(const TDataSink& tsink) override {
+        return Status::InternalError("{} should not init with TDataSink");
+    }
+
+    Status init(const TPlanNode& tnode, RuntimeState* state) override;
+
+    Status prepare(RuntimeState* state) override;
+    Status open(RuntimeState* state) override;
+
+    Status sink(RuntimeState* state, vectorized::Block* in_block,
+                SourceState source_state) override;
+
+    bool can_write(RuntimeState* state) override { return true; }
+
+    Status alloc_resource(RuntimeState* state) {
+        // open const expr lists.
+        RETURN_IF_ERROR(vectorized::VExpr::open(_const_expr, state));
+
+        // open result expr lists.
+        RETURN_IF_ERROR(vectorized::VExpr::open(_child_expr, state));
+
+        return Status::OK();
+    }
+
+private:
+    int get_first_materialized_child_idx() const { return 
_first_materialized_child_idx; }

Review Comment:
   ```suggestion
       int _get_first_materialized_child_idx() const { return 
_first_materialized_child_idx; }
   ```



##########
be/src/pipeline/exec/union_sink_operator.h:
##########
@@ -64,5 +65,121 @@ class UnionSinkOperator final : public 
StreamingOperator<UnionSinkOperatorBuilde
     std::shared_ptr<DataQueue> _data_queue;
     std::unique_ptr<vectorized::Block> _output_block;
 };
+
+class UnionSinkOperatorX;
+class UnionSinkLocalState final : public 
PipelineXSinkLocalState<UnionDependency> {
+public:
+    ENABLE_FACTORY_CREATOR(UnionSinkLocalState);
+    UnionSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
+            : Base(parent, state), _child_idx(0), _child_row_idx(0) {}
+    Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+    friend class UnionSinkOperatorX;
+    using Base = PipelineXSinkLocalState<UnionDependency>;
+    using Parent = UnionSinkOperatorX;
+    void set(std::shared_ptr<DataQueue> que) { _shared_state->_data_queue = 
que; }
+
+private:
+    std::unique_ptr<vectorized::Block> _output_block;
+
+    /// Const exprs materialized by this node. These exprs don't refer to any 
children.
+    /// Only materialized by the first fragment instance to avoid duplication.
+    vectorized::VExprContextSPtrs _const_expr;
+
+    /// Exprs materialized by this node. The i-th result expr list refers to 
the i-th child.
+    vectorized::VExprContextSPtrs _child_expr;
+
+    /// Index of current child.
+    [[maybe_unused]] int _child_idx;
+
+    /// Index of current row in child_row_block_.
+    [[maybe_unused]] int _child_row_idx;
+};
+
+class UnionSinkOperatorX final : public DataSinkOperatorX<UnionSinkLocalState> 
{
+public:
+    using Base = DataSinkOperatorX<UnionSinkLocalState>;
+
+    friend class UnionSinkLocalState;
+    UnionSinkOperatorX(int child_id, int sink_id, ObjectPool* pool, const 
TPlanNode& tnode,
+                       const DescriptorTbl& descs);
+    ~UnionSinkOperatorX() override = default;
+    Status init(const TDataSink& tsink) override {
+        return Status::InternalError("{} should not init with TDataSink");
+    }
+
+    Status init(const TPlanNode& tnode, RuntimeState* state) override;
+
+    Status prepare(RuntimeState* state) override;
+    Status open(RuntimeState* state) override;
+
+    Status sink(RuntimeState* state, vectorized::Block* in_block,
+                SourceState source_state) override;
+
+    bool can_write(RuntimeState* state) override { return true; }
+
+    Status alloc_resource(RuntimeState* state) {

Review Comment:
   We do not need `alloc_resource` in pipelineX. Just move this into `open`



##########
be/src/pipeline/pipeline_x/operator.h:
##########
@@ -473,6 +478,7 @@ class DataSinkOperatorXBase : public OperatorBase {
 
 protected:
     const int _id;
+    const int _source_id;

Review Comment:
   ```suggestion
       const int _dest_id;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to