alamb commented on code in PR #22571:
URL: https://github.com/apache/datafusion/pull/22571#discussion_r3312899353


##########
datafusion/physical-plan/src/aggregates/topk_stream.rs:
##########
@@ -209,17 +225,32 @@ impl Stream for GroupedTopKAggregateStream {
                     // Release the input pipeline's resources before emitting.
                     let input_schema = self.input.schema();
                     self.input = 
Box::pin(EmptyRecordBatchStream::new(input_schema));
-                    if self.priority_map.is_empty() {
+                    if self.priority_map.is_empty() && !self.null_group_seen {
                         trace!("partition {} emit None", self.partition);
+                        self.done = true;
                         return Poll::Ready(None);
                     }
                     let batch = {
                         let _timer = emitting_time.timer();
-                        let mut cols = self.priority_map.emit()?;
+                        let mut cols = if self.priority_map.is_empty() {
+                            vec![]
+                        } else {
+                            self.priority_map.emit()?
+                        };
                         // For DISTINCT case (no aggregate expressions), only 
use the group key column
                         // since the schema only has one field and key/value 
are the same
-                        if self.aggregate_arguments.is_empty() {
+                        if self.is_distinct() {

Review Comment:
   can we try and encapsulate some of this logic in a helper function perhaps 
to try and keep this code easy to read? 
   
   
   It would also perhaps help to add some comments here about why concat is 
needed
   
   



##########
datafusion/physical-plan/src/aggregates/topk_stream.rs:
##########
@@ -138,6 +148,9 @@ impl GroupedTopKAggregateStream {
         let has_nulls = vals.null_count() > 0;
         for row_idx in 0..len {
             if has_nulls && vals.is_null(row_idx) {
+                if self.is_distinct() {
+                    self.null_group_seen = true;

Review Comment:
   this is in the (hot) inner loop and I worry about the performance 
implications of this change
   
   Is it enough to check outside the loop? something like
   
   ```rust
           let has_nulls = vals.null_count() > 0;
           if has_nulls && self.is_distinct() {
               self.null_group_seen = true;
           }
           for row_idx in 0..len {
   ...
   }
   



##########
datafusion/physical-plan/src/aggregates/topk_stream.rs:
##########
@@ -128,6 +134,10 @@ impl RecordBatchStream for GroupedTopKAggregateStream {
 }
 
 impl GroupedTopKAggregateStream {
+    fn is_distinct(&self) -> bool {
+        self.aggregate_arguments.is_empty()

Review Comment:
   I think you can get no aggregates for just a normal `SELECT x,y,z GROUP BY 
x,y,z` type query, Though maybe that has the same semantics



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to