ahmed-mez commented on code in PR #18906:
URL: https://github.com/apache/datafusion/pull/18906#discussion_r2556052719


##########
datafusion/physical-plan/src/aggregates/group_values/row.rs:
##########
@@ -206,29 +233,43 @@ impl GroupValues for GroupValuesRows {
                 output
             }
             EmitTo::First(n) => {
-                let groups_rows = group_values.iter().take(n);
-                let output = self.row_converter.convert_rows(groups_rows)?;
-                // Clear out first n group keys by copying them to a new Rows.
-                // TODO file some ticket in arrow-rs to make this more 
efficient?
-                let mut new_group_values = self.row_converter.empty_rows(0, 0);
-                for row in group_values.iter().skip(n) {
-                    new_group_values.push(row);
-                }
-                std::mem::swap(&mut new_group_values, &mut group_values);
-
-                self.map.retain(|(_exists_hash, group_idx)| {
-                    // Decrement group index by n
-                    match group_idx.checked_sub(n) {
-                        // Group index was >= n, shift value down
-                        Some(sub) => {
-                            *group_idx = sub;
-                            true
-                        }
-                        // Group index was < n, so remove from table
-                        None => false,
+                if self.drain_mode {
+                    let start = self.emission_offset;
+                    let end = std::cmp::min(start + n, 
group_values.num_rows());
+                    let iter = group_values.iter().skip(start).take(end - 
start);
+                    let output = self.row_converter.convert_rows(iter)?;
+                    self.emission_offset = end;
+                    if self.emission_offset == group_values.num_rows() {
+                        group_values.clear();
+                        self.emission_offset = 0;

Review Comment:
   The `emission_offset` optimization is just to avoid the expensive "copy 
remaining rows" operation that the old `EmitTo::First` path did during input 
processing. The real fix is replacing `emit(EmitTo::All)` (which blocks for 
seconds on large group counts) with incremental drain 
https://github.com/apache/datafusion/pull/18906/files#diff-69c8ecaca5e2c7005f2ed1facaa41f80b45bfd006f2357e53ff3072f535c287dR1196



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to