This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 28acfaed2b8c12241750702b7eb7dd86ef482df5
Author: Mryange <[email protected]>
AuthorDate: Mon Apr 8 11:12:26 2024 +0800

    [fix](pipeline)group by and output is empty (#33192)
---
 be/src/pipeline/pipeline_fragment_context.cpp | 4 ++--
 be/src/vec/exec/vaggregation_node.cpp         | 4 ++--
 be/src/vec/exec/vaggregation_node.h           | 2 ++
 3 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index a32d777788d..e53492c9fa0 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -556,10 +556,10 @@ Status 
PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
         break;
     }
     case TPlanNodeType::AGGREGATION_NODE: {
-        auto* agg_node = dynamic_cast<vectorized::AggregationNode*>(node);
+        auto* agg_node = static_cast<vectorized::AggregationNode*>(node);
         auto new_pipe = add_pipeline();
         RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipe));
-        if (agg_node->is_probe_expr_ctxs_empty() && 
node->row_desc().num_slots() == 0) {
+        if (agg_node->is_probe_expr_ctxs_empty() && 
agg_node->agg_output_desc()->slots().empty()) {
             return Status::InternalError("Illegal aggregate node " +
                                          std::to_string(agg_node->id()) +
                                          ": group by and output is empty");
diff --git a/be/src/vec/exec/vaggregation_node.cpp 
b/be/src/vec/exec/vaggregation_node.cpp
index d46aa3f5736..67072e2f60f 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -119,6 +119,8 @@ AggregationNode::AggregationNode(ObjectPool* pool, const 
TPlanNode& tnode,
     _is_first_phase = tnode.agg_node.__isset.is_first_phase && 
tnode.agg_node.is_first_phase;
     _agg_data = std::make_unique<AggregatedDataVariants>();
     _agg_arena_pool = std::make_unique<Arena>();
+    _intermediate_tuple_desc = 
descs.get_tuple_descriptor(_intermediate_tuple_id);
+    _output_tuple_desc = descs.get_tuple_descriptor(_output_tuple_id);
 }
 
 AggregationNode::~AggregationNode() = default;
@@ -250,8 +252,6 @@ Status AggregationNode::prepare_profile(RuntimeState* 
state) {
     _hash_table_input_counter = ADD_COUNTER(runtime_profile(), 
"HashTableInputCount", TUnit::UNIT);
     _max_row_size_counter = ADD_COUNTER(runtime_profile(), 
"MaxRowSizeInBytes", TUnit::UNIT);
     COUNTER_SET(_max_row_size_counter, (int64_t)0);
-    _intermediate_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
-    _output_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
     DCHECK_EQ(_intermediate_tuple_desc->slots().size(), 
_output_tuple_desc->slots().size());
     RETURN_IF_ERROR(VExpr::prepare(_probe_expr_ctxs, state, 
child(0)->row_desc()));
 
diff --git a/be/src/vec/exec/vaggregation_node.h 
b/be/src/vec/exec/vaggregation_node.h
index f89bbb9d780..93723e54b82 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -424,6 +424,8 @@ public:
     /// the preagg should pass through any rows it can't fit in its tables.
     bool _should_expand_preagg_hash_tables();
 
+    TupleDescriptor* agg_output_desc() { return _output_tuple_desc; }
+
 protected:
     bool _is_streaming_preagg;
     bool _child_eos = false;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to