niebayes opened a new issue, #16837: URL: https://github.com/apache/datafusion/issues/16837
### Describe the bug There's a rare case that a plan `TableScan -> Aggregate -> TopK` would fail to execute, and raise the following error: ``` Arrow error: Invalid argument error: column types must match schema types, expected Timestamp(Millisecond, Some("UTC")) but found Timestamp(Millisecond, None) at column index 0 ``` This issue occurs IF AND ONLY IF all of the following conditions are met simultaneously: 1. Grouping by a timestamp column, where the timestamp column has a non-empty timezone. 2. Aggregating by a column other than the timestamp column (the aggregation column cannot be a timestamp column). 3. Sorting by this aggregation column. 4. The sorting is in descending order. (Ascending order does not trigger this bug.) 5. A non-zero Limit is applied. ### To Reproduce I have written a unit test to reproduce the bug. The test consists of two cases: 1. Construct a plan `TableScan -> Aggregate -> TopK` with `asc = true`. Executing this plan is ok. 2. Construct the same plan but with `asc = false`, i.e. sort in descending order. Executing this plan would raise an error. ```rust #[cfg(test)] mod tests { use std::sync::Arc; use arrow_array::{Int32Array, RecordBatch, TimestampMillisecondArray}; use arrow_cast::pretty::pretty_format_batches; use arrow_schema::{DataType, Field, Schema, TimeUnit}; use datafusion::{ catalog::MemTable, datasource::provider_as_source, functions_aggregate::min_max::max, physical_plan::collect, prelude::SessionContext, }; use datafusion_common::DataFusionError; use datafusion_expr::{LogicalPlanBuilder, SortExpr, col}; async fn test_aggregate_then_topk(asc: bool) -> datafusion_common::Result<()> { let schema = Arc::new(Schema::new(vec![ Field::new( "ts", DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())), false, ), Field::new("value", DataType::Int32, false), ])); let columns = vec![ Arc::new( TimestampMillisecondArray::from(vec![1000, 2000, 3000]) .with_timezone("UTC".to_string()), ) as _, Arc::new(Int32Array::from(vec![1, 2, 3])) as _, ]; let batch = RecordBatch::try_new(schema.clone(), columns).map_err(DataFusionError::from)?; let mem_table = MemTable::try_new(schema.clone(), vec![vec![batch]])?; let plan = LogicalPlanBuilder::scan("t", provider_as_source(Arc::new(mem_table)), None)? .aggregate(vec![col("ts")], vec![max(col("value")).alias("max_value")])? .sort_with_limit(vec![SortExpr::new(col("max_value"), asc, true)], Some(1))? .build()?; println!("{}", plan.display_indent()); let session_state = SessionContext::new().state(); let exec_plan = session_state.create_physical_plan(&plan).await?; let batches = collect(exec_plan, session_state.task_ctx()).await?; println!("{}", pretty_format_batches(&batches).unwrap()); Ok(()) } #[tokio::test] async fn test() { // Case 1: TableScan -> Aggregate -> Topk (asc = true) let result = test_aggregate_then_topk(true).await; assert!(result.is_ok()); // Case 2: TableScan -> Aggregate -> Topk (asc = false) let result = test_aggregate_then_topk(false).await; assert!(result.is_err()); } } ``` ### Expected behavior _No response_ ### Additional context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org