Gabriel39 commented on code in PR #24544:
URL: https://github.com/apache/doris/pull/24544#discussion_r1329586222


##########
be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp:
##########
@@ -94,4 +95,182 @@ OperatorPtr 
DistinctStreamingAggSinkOperatorBuilder::build_operator() {
     return std::make_shared<DistinctStreamingAggSinkOperator>(this, _node, 
_data_queue);
 }
 
+DistinctStreamingAggSinkLocalState::DistinctStreamingAggSinkLocalState(
+        DataSinkOperatorXBase* parent, RuntimeState* state)
+        : AggSinkLocalState<AggDependency, 
DistinctStreamingAggSinkLocalState>(parent, state) {
+    dummy_mapped_data = std::make_shared<char>('A');
+}
+
+Status DistinctStreamingAggSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {

Review Comment:
   do not need to override



##########
be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp:
##########
@@ -94,4 +95,182 @@ OperatorPtr 
DistinctStreamingAggSinkOperatorBuilder::build_operator() {
     return std::make_shared<DistinctStreamingAggSinkOperator>(this, _node, 
_data_queue);
 }
 
+DistinctStreamingAggSinkLocalState::DistinctStreamingAggSinkLocalState(
+        DataSinkOperatorXBase* parent, RuntimeState* state)
+        : AggSinkLocalState<AggDependency, 
DistinctStreamingAggSinkLocalState>(parent, state) {
+    dummy_mapped_data = std::make_shared<char>('A');

Review Comment:
   Initialize this after parent 's ctor



##########
be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h:
##########
@@ -63,5 +64,19 @@ class DistinctStreamingAggSourceOperator final
     std::shared_ptr<DataQueue> _data_queue;
 };
 
+class DistinctStreamingAggSourceOperatorX final : public AggSourceOperatorX {
+public:
+    using Base = AggSourceOperatorX;
+    DistinctStreamingAggSourceOperatorX(ObjectPool* pool, const TPlanNode& 
tnode,
+                                        const DescriptorTbl& descs);
+    ~DistinctStreamingAggSourceOperatorX() = default;
+
+    Status init(const TPlanNode& tnode, RuntimeState* state) override;
+
+    Status get_block(RuntimeState* state, vectorized::Block* block,
+                     SourceState& source_state) override;
+    bool _is_streaming_preagg = false;
+};
+
 } // namespace pipeline
 } // namespace doris

Review Comment:
   Append an empty line



##########
be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h:
##########
@@ -72,5 +74,44 @@ class DistinctStreamingAggSinkOperator final
     std::unique_ptr<vectorized::Block> _output_block = 
vectorized::Block::create_unique();
 };
 
+class DistinctStreamingAggSinkOperatorX;
+
+class DistinctStreamingAggSinkLocalState final
+        : public AggSinkLocalState<AggDependency, 
DistinctStreamingAggSinkLocalState> {
+public:
+    using Parent = DistinctStreamingAggSinkOperatorX;
+    using Base = AggSinkLocalState<AggDependency, 
DistinctStreamingAggSinkLocalState>;
+    ENABLE_FACTORY_CREATOR(DistinctStreamingAggSinkLocalState);
+    DistinctStreamingAggSinkLocalState(DataSinkOperatorXBase* parent, 
RuntimeState* state);
+
+    Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+    Status close(RuntimeState* state) override;
+    Status _distinct_pre_agg_with_serialized_key(vectorized::Block* in_block,
+                                                 vectorized::Block* out_block);
+
+private:
+    friend class DistinctStreamingAggSinkOperatorX;
+    void _emplace_into_hash_table_to_distinct(vectorized::IColumn::Selector& 
distinct_row,
+                                              vectorized::ColumnRawPtrs& 
key_columns,
+                                              const size_t num_rows);
+
+    std::unique_ptr<vectorized::Block> _output_block = 
vectorized::Block::create_unique();
+    std::shared_ptr<char> dummy_mapped_data = nullptr;
+    vectorized::IColumn::Selector _distinct_row;
+    int64_t _output_distinct_rows = 0;
+};
+
+class DistinctStreamingAggSinkOperatorX final
+        : public AggSinkOperatorX<DistinctStreamingAggSinkLocalState> {
+public:
+    DistinctStreamingAggSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode,
+                                      const DescriptorTbl& descs);
+    Status init(const TPlanNode& tnode, RuntimeState* state) override;
+    Status sink(RuntimeState* state, vectorized::Block* in_block,
+                SourceState source_state) override;
+
+    bool can_write(RuntimeState* state) override;
+};
+
 } // namespace pipeline
 } // namespace doris

Review Comment:
   Append an empty line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to