wkalt opened a new issue, #17510:
URL: https://github.com/apache/datafusion/issues/17510

   ### Describe the bug
   
   The following code produces an incorrect result on Datafusion 49.0.2, but 
correct behavior on 48.0.1. In Datafusion 49.0.2. Both execution plans should 
produce the same row count. The behavior is independent of the order in which 
the original/optimized plans are executed. If the order of the executions is 
flipped, the second execution always returns the incorrect result.
   
   ```
   use arrow_array::{RecordBatch, UInt32Array};
   use arrow_schema::{DataType, Field, Schema};
   use datafusion::datasource::memory::MemTable;
   use datafusion::physical_optimizer::PhysicalOptimizerRule;
   use datafusion::physical_plan::ExecutionPlan;
   use datafusion::prelude::*;
   use datafusion_common::Result as DFResult;
   use std::sync::Arc;
   
   #[derive(Debug)]
   struct NoOpOptimizer;
   
   impl PhysicalOptimizerRule for NoOpOptimizer {
       fn optimize(
           &self,
           plan: Arc<dyn ExecutionPlan>,
           _config: &datafusion_common::config::ConfigOptions,
       ) -> DFResult<Arc<dyn ExecutionPlan>> {
           Ok(plan)
       }
   
       fn name(&self) -> &str {
           "NoOpOptimizer"
       }
   
       fn schema_check(&self) -> bool {
           true
       }
   }
   
   #[tokio::main]
   async fn main() -> Result<(), Box<dyn std::error::Error>> {
       // Create simple test data
       let schema = Arc::new(Schema::new(vec![Field::new("id", 
DataType::UInt32, false)]));
   
       let batch = RecordBatch::try_new(
           schema.clone(),
           vec![Arc::new(UInt32Array::from((0..20).collect::<Vec<u32>>()))],
       )?;
   
       let ctx = SessionContext::new();
       let provider = MemTable::try_new(schema.clone(), vec![vec![batch]])?;
       ctx.register_table("test", Arc::new(provider))?;
   
       // Create a simple plan with a sort and a limit.
       let df = ctx
           .table("test")
           .await?
           .sort(vec![col("id").sort(true, true)])?
           .limit(0, Some(10))?;
   
       let logical_plan = df.into_optimized_plan()?;
       let plan = ctx.state().create_physical_plan(&logical_plan).await?;
   
       // Apply no-op optimizer
       let optimizer = NoOpOptimizer;
       let optimized_plan = optimizer.optimize(plan.clone(), 
&Default::default())?;
   
       let task_ctx = ctx.task_ctx();
   
       let original_stream = plan.execute(0, task_ctx.clone())?;
       let original_batches = 
datafusion::physical_plan::common::collect(original_stream).await?;
       let original_count: usize = original_batches.iter().map(|b| 
b.num_rows()).sum();
   
       let optimized_stream = optimized_plan.execute(0, task_ctx)?;
       let optimized_batches = 
datafusion::physical_plan::common::collect(optimized_stream).await?;
       let optimized_count: usize = optimized_batches.iter().map(|b| 
b.num_rows()).sum();
   
       // Result mismatch on DF 49
       println!("Expected: 10 rows (limit in query)");
       println!("Optimized plan: {} rows", optimized_count);
       println!("Original plan:  {} rows", original_count);
   
       Ok(())
   }
   ```
   
   Cargo.toml:
   ```
   [package]
   name = "datafusion-regression-test"
   version = "0.1.0"
   edition = "2021"
   
   [workspace]
   
   [dependencies]
   arrow-array = "55"
   arrow-schema = "55"
   datafusion = "49.0.2"
   datafusion-common = "49.0.2"
   tokio = { version = "1", features = ["full"] }
   ```
   
   DF 49.0.2:
   ```
   Expected: 10 rows (limit in query)
   Optimized plan: 9 rows
   Original plan:  10 rows
   ```
   
   DF 48.01:
   ```
   Expected: 10 rows (limit in query)
   Optimized plan: 10 rows
   Original plan:  10 rows
   ```
   
   ### To Reproduce
   
   _No response_
   
   ### Expected behavior
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to