xanderbailey opened a new issue, #18701:
URL: https://github.com/apache/datafusion/issues/18701

   ### Describe the bug
   
   There is a race condition in `AggregateExec` if two conditions are met on 
the same record batch.
   1. Memory pressure is high we results in `emit_early_if_necessary`.
   `let n = self.group_values.len() / self.batch_size * self.batch_size;` are 
emitted, leaving the remainder still in the hash table. Then it 
sets`self.exec_state = ExecutionState::ProducingOutput(batch);`
   3. `skip_aggregation_probe` says you should also skip aggregating in 
`switch_to_skip_aggregation` because cardinality is high. This emits the 
remainder into a new batch and overrides the `self.exec_state = 
ExecutionState::ProducingOutput(batch);` in the same iteration loop. Loop 
finishes and state moves to `ProducingOutput` which will return only the 
remainder batch.
   
   
   ### To Reproduce
   
   ```
   #[cfg(test)]
   mod tests {
       use super::*;
       use crate::test::TestMemoryExec;
       use arrow::array::{Int32Array, Int64Array};
       use arrow::datatypes::{DataType, Field, Schema};
       use datafusion_execution::runtime_env::RuntimeEnvBuilder;
       use datafusion_execution::TaskContext;
       use datafusion_functions_aggregate::count::count_udaf;
       use datafusion_physical_expr::aggregate::AggregateExprBuilder;
       use datafusion_physical_expr::expressions::col;
       use std::sync::Arc;
   
       #[tokio::test]
       async fn test_double_emission_race_condition_bug() -> Result<()> {
           // This test specifically reproduces the double emission race 
condition
           // where emit_early_if_necessary() and switch_to_skip_aggregation()
           // both emit in the same loop iteration, causing data loss
   
           let schema = Arc::new(Schema::new(vec![
               Field::new("group_col", DataType::Int32, false),
               Field::new("value_col", DataType::Int64, false),
           ]));
   
           // Create data that will trigger BOTH conditions in the same 
iteration:
           // 1. More groups than batch_size (triggers early emission when 
memory pressure hits)
           // 2. High cardinality ratio (triggers skip aggregation)
           let batch_size = 1024; // We'll set this in session config
           let num_groups = batch_size + 100; // Slightly more than batch_size 
(1124 groups)
   
           // Create exactly 1 row per group = 100% cardinality ratio
           let group_ids: Vec<i32> = (0..num_groups as i32).collect();
           let values: Vec<i64> = vec![1; num_groups];
   
           let batch = RecordBatch::try_new(
               Arc::clone(&schema),
               vec![
                   Arc::new(Int32Array::from(group_ids)),
                   Arc::new(Int64Array::from(values)),
               ],
           )?;
   
           let input_partitions = vec![vec![batch]];
   
           // Create constrained memory to trigger early emission but not 
completely fail
           let runtime = RuntimeEnvBuilder::default()
               .with_memory_limit(1024, 1.0) // 100KB - enough to start but 
will trigger pressure
               .build_arc()?;
   
           let mut task_ctx = TaskContext::default().with_runtime(runtime);
   
           // Configure to trigger BOTH conditions:
           // 1. Low probe threshold (triggers skip probe after few rows)
           // 2. Low ratio threshold (triggers skip aggregation immediately)
           // 3. Set batch_size to 1024 so our 1124 groups will trigger early 
emission
           // This creates the race condition where both emit paths are 
triggered
           let mut session_config = task_ctx.session_config().clone();
           session_config = session_config.set(
               "datafusion.execution.batch_size",
               &datafusion_common::ScalarValue::UInt64(Some(1024)),
           );
           session_config = session_config.set(
               
"datafusion.execution.skip_partial_aggregation_probe_rows_threshold",
               &datafusion_common::ScalarValue::UInt64(Some(50)),
           );
           session_config = session_config.set(
               
"datafusion.execution.skip_partial_aggregation_probe_ratio_threshold",
               &datafusion_common::ScalarValue::Float64(Some(0.8)),
           );
           task_ctx = task_ctx.with_session_config(session_config);
           let task_ctx = Arc::new(task_ctx);
   
           // Create aggregate: COUNT(*) GROUP BY group_col
           let group_expr = vec![(col("group_col", &schema)?, 
"group_col".to_string())];
           let aggr_expr = vec![Arc::new(
               AggregateExprBuilder::new(count_udaf(), vec![col("value_col", 
&schema)?])
                   .schema(Arc::clone(&schema))
                   .alias("count_value")
                   .build()?,
           )];
   
           let exec = TestMemoryExec::try_new(&input_partitions, 
Arc::clone(&schema), None)?;
           let exec = Arc::new(TestMemoryExec::update_cache(Arc::new(exec)));
   
           // Use Partial mode where the race condition occurs
           let aggregate_exec = AggregateExec::try_new(
               AggregateMode::Partial,
               PhysicalGroupBy::new_single(group_expr),
               aggr_expr,
               vec![None],
               exec,
               Arc::clone(&schema),
           )?;
   
           // Execute and collect results
           let mut stream = GroupedHashAggregateStream::new(&aggregate_exec, 
Arc::clone(&task_ctx), 0)?;
           let mut results = Vec::new();
   
           while let Some(result) = stream.next().await {
               let batch = result?;
               results.push(batch);
           }
   
           // Count total groups emitted
           let mut total_output_groups = 0;
           for batch in &results {
               total_output_groups += batch.num_rows();
           }
   
           // With the race condition bug:
           // 1. emit_early_if_necessary() emits first batch_size groups (1024)
           // 2. switch_to_skip_aggregation() immediately overwrites with 
remaining groups (100)
           // 3. The 1024 groups from step 1 are LOST!
           // 4. Only 100 groups are returned instead of 1124
   
           println!(
               "Double emission race condition test: Expected {} groups, got {} 
groups",
               num_groups, total_output_groups
           );
   
           if total_output_groups < num_groups / 2 {
               println!(
                   "🐛 BUG REPRODUCED! Lost {} groups ({:.1}% loss) - this 
indicates the double emission race condition",
                   num_groups - total_output_groups,
                   (1.0 - total_output_groups as f64 / num_groups as f64) * 
100.0
               );
           }
   
           // This test documents the expected behavior vs actual buggy behavior
           // TODO: Once fixed, this assertion should pass
           assert_eq!(
               total_output_groups, num_groups,
               "Double emission race condition detected! 
emit_early_if_necessary() result \
                was overwritten by switch_to_skip_aggregation(). Expected {} 
groups, got {} groups",
               num_groups, total_output_groups
           );
   
           Ok(())
       }
   ```
   
   ### Expected behavior
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to