niebayes opened a new issue, #16837:
URL: https://github.com/apache/datafusion/issues/16837

   ### Describe the bug
   
   There's a rare case that a plan `TableScan -> Aggregate -> TopK` would fail 
to execute, and raise the following error:
   ```
   Arrow error: Invalid argument error: column types must match schema types, 
expected Timestamp(Millisecond, Some("UTC")) but found Timestamp(Millisecond, 
None) at column index 0
   ```
   
   This issue occurs IF AND ONLY IF all of the following conditions are met 
simultaneously:
   1. Grouping by a timestamp column, where the timestamp column has a 
non-empty timezone.
   2. Aggregating by a column other than the timestamp column (the aggregation 
column cannot be a timestamp column).
   3. Sorting by this aggregation column.
   4. The sorting is in descending order. (Ascending order does not trigger 
this bug.)
   5. A non-zero Limit is applied.
   
   ### To Reproduce
   
   I have written a unit test to reproduce the bug. The test consists of two 
cases:
   1. Construct a plan `TableScan -> Aggregate -> TopK` with `asc = true`. 
Executing this plan is ok.
   2. Construct the same plan but with `asc = false`, i.e. sort in descending 
order. Executing this plan would raise an error.
   
   ```rust
   
   #[cfg(test)]
   mod tests {
       use std::sync::Arc;
   
       use arrow_array::{Int32Array, RecordBatch, TimestampMillisecondArray};
       use arrow_cast::pretty::pretty_format_batches;
       use arrow_schema::{DataType, Field, Schema, TimeUnit};
       use datafusion::{
           catalog::MemTable, datasource::provider_as_source, 
functions_aggregate::min_max::max,
           physical_plan::collect, prelude::SessionContext,
       };
       use datafusion_common::DataFusionError;
       use datafusion_expr::{LogicalPlanBuilder, SortExpr, col};
   
       async fn test_aggregate_then_topk(asc: bool) -> 
datafusion_common::Result<()> {
           let schema = Arc::new(Schema::new(vec![
               Field::new(
                   "ts",
                   DataType::Timestamp(TimeUnit::Millisecond, 
Some("UTC".into())),
                   false,
               ),
               Field::new("value", DataType::Int32, false),
           ]));
           let columns = vec![
               Arc::new(
                   TimestampMillisecondArray::from(vec![1000, 2000, 3000])
                       .with_timezone("UTC".to_string()),
               ) as _,
               Arc::new(Int32Array::from(vec![1, 2, 3])) as _,
           ];
           let batch = RecordBatch::try_new(schema.clone(), 
columns).map_err(DataFusionError::from)?;
           let mem_table = MemTable::try_new(schema.clone(), 
vec![vec![batch]])?;
   
           let plan = LogicalPlanBuilder::scan("t", 
provider_as_source(Arc::new(mem_table)), None)?
               .aggregate(vec![col("ts")], 
vec![max(col("value")).alias("max_value")])?
               .sort_with_limit(vec![SortExpr::new(col("max_value"), asc, 
true)], Some(1))?
               .build()?;
           println!("{}", plan.display_indent());
   
           let session_state = SessionContext::new().state();
           let exec_plan = session_state.create_physical_plan(&plan).await?;
           let batches = collect(exec_plan, session_state.task_ctx()).await?;
           println!("{}", pretty_format_batches(&batches).unwrap());
   
           Ok(())
       }
   
       #[tokio::test]
       async fn test() {
           // Case 1: TableScan -> Aggregate -> Topk (asc = true)
           let result = test_aggregate_then_topk(true).await;
           assert!(result.is_ok());
   
           // Case 2: TableScan -> Aggregate -> Topk (asc = false)
           let result = test_aggregate_then_topk(false).await;
           assert!(result.is_err());
       }
   }
   ```
   
   ### Expected behavior
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to