alamb commented on code in PR #18712:
URL: https://github.com/apache/datafusion/pull/18712#discussion_r2545674020


##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -709,9 +709,21 @@ impl Stream for GroupedHashAggregateStream {
                                 break 'reading_input;
                             }
 
-                            self.emit_early_if_necessary()?;
+                            // Check if we should switch to skip aggregation 
mode
+                            // It's important that we do this before we early 
emit since we've
+                            // already updated the probe.
+                            if let Some(new_state) = 
self.switch_to_skip_aggregation()? {

Review Comment:
   I read this new logic carefully and I think moving the set to switch 
aggregation state next to the check makes a lot of sense to me



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -1239,3 +1270,123 @@ impl GroupedHashAggregateStream {
         Ok(states_batch)
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::test::TestMemoryExec;
+    use arrow::array::{Int32Array, Int64Array};
+    use arrow::datatypes::{DataType, Field, Schema};
+    use datafusion_execution::runtime_env::RuntimeEnvBuilder;
+    use datafusion_execution::TaskContext;
+    use datafusion_functions_aggregate::count::count_udaf;
+    use datafusion_physical_expr::aggregate::AggregateExprBuilder;
+    use datafusion_physical_expr::expressions::col;
+    use std::sync::Arc;
+
+    #[tokio::test]
+    async fn test_double_emission_race_condition_bug() -> Result<()> {

Review Comment:
   I verified that this test covers the code in this PR as it fails without the 
code changes:
   
   ```
   thread 
'aggregates::row_hash::tests::test_double_emission_race_condition_bug' 
(41064140) panicked at 
datafusion/physical-plan/src/aggregates/row_hash.rs:1354:9:
   assertion `left == right` failed: Unexpected number of groups
     left: 100
    right: 1124
   stack backtrace:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to