wkalt opened a new issue, #17510:
URL: https://github.com/apache/datafusion/issues/17510
### Describe the bug
The following code produces an incorrect result on Datafusion 49.0.2, but
correct behavior on 48.0.1. In Datafusion 49.0.2. Both execution plans should
produce the same row count. The behavior is independent of the order in which
the original/optimized plans are executed. If the order of the executions is
flipped, the second execution always returns the incorrect result.
```
use arrow_array::{RecordBatch, UInt32Array};
use arrow_schema::{DataType, Field, Schema};
use datafusion::datasource::memory::MemTable;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::*;
use datafusion_common::Result as DFResult;
use std::sync::Arc;
#[derive(Debug)]
struct NoOpOptimizer;
impl PhysicalOptimizerRule for NoOpOptimizer {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &datafusion_common::config::ConfigOptions,
) -> DFResult<Arc<dyn ExecutionPlan>> {
Ok(plan)
}
fn name(&self) -> &str {
"NoOpOptimizer"
}
fn schema_check(&self) -> bool {
true
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create simple test data
let schema = Arc::new(Schema::new(vec![Field::new("id",
DataType::UInt32, false)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(UInt32Array::from((0..20).collect::<Vec<u32>>()))],
)?;
let ctx = SessionContext::new();
let provider = MemTable::try_new(schema.clone(), vec![vec![batch]])?;
ctx.register_table("test", Arc::new(provider))?;
// Create a simple plan with a sort and a limit.
let df = ctx
.table("test")
.await?
.sort(vec![col("id").sort(true, true)])?
.limit(0, Some(10))?;
let logical_plan = df.into_optimized_plan()?;
let plan = ctx.state().create_physical_plan(&logical_plan).await?;
// Apply no-op optimizer
let optimizer = NoOpOptimizer;
let optimized_plan = optimizer.optimize(plan.clone(),
&Default::default())?;
let task_ctx = ctx.task_ctx();
let original_stream = plan.execute(0, task_ctx.clone())?;
let original_batches =
datafusion::physical_plan::common::collect(original_stream).await?;
let original_count: usize = original_batches.iter().map(|b|
b.num_rows()).sum();
let optimized_stream = optimized_plan.execute(0, task_ctx)?;
let optimized_batches =
datafusion::physical_plan::common::collect(optimized_stream).await?;
let optimized_count: usize = optimized_batches.iter().map(|b|
b.num_rows()).sum();
// Result mismatch on DF 49
println!("Expected: 10 rows (limit in query)");
println!("Optimized plan: {} rows", optimized_count);
println!("Original plan: {} rows", original_count);
Ok(())
}
```
Cargo.toml:
```
[package]
name = "datafusion-regression-test"
version = "0.1.0"
edition = "2021"
[workspace]
[dependencies]
arrow-array = "55"
arrow-schema = "55"
datafusion = "49.0.2"
datafusion-common = "49.0.2"
tokio = { version = "1", features = ["full"] }
```
DF 49.0.2:
```
Expected: 10 rows (limit in query)
Optimized plan: 9 rows
Original plan: 10 rows
```
DF 48.01:
```
Expected: 10 rows (limit in query)
Optimized plan: 10 rows
Original plan: 10 rows
```
### To Reproduce
_No response_
### Expected behavior
_No response_
### Additional context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]