niebayes opened a new issue, #18816:
URL: https://github.com/apache/datafusion/issues/18816

   I manually composed a sort of simple plan: TableScan -> Projection -> Filter 
-> Sort -> Extension.
   However, DataFusion optimizer made it very complicated there're two bugs:
   - Unnecessary columns projected
   - Redundant filters pushed down
   
   DataFusion revision: 2dd17b9e6c0f2d55f6ad0a188709e48a5426b155 
   
   To produce:
   
   ``` Rust
   
   #[cfg(test)]
   mod tests {
       use std::any::Any;
       use std::sync::Arc;
   
       use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
       use async_trait::async_trait;
       use datafusion::catalog::{Session, TableProvider};
       use datafusion::datasource::provider_as_source;
       use datafusion::physical_plan::ExecutionPlan;
       use datafusion::prelude::SessionContext;
       use datafusion_common::{DFSchemaRef, Result, ScalarValue, ToDFSchema};
       use datafusion_expr::{
           col, Expr, ExprSchemable, Extension, LogicalPlan, 
LogicalPlanBuilder, SortExpr,
           TableProviderFilterPushDown, TableType, UserDefinedLogicalNodeCore,
       };
   
       fn timestamp_ms(value: i64) -> Expr {
           Expr::Literal(
               ScalarValue::TimestampMillisecond(Some(value), 
Some("UTC".into())),
               None,
           )
       }
   
       #[derive(Debug)]
       struct DummyTable {
           schema: SchemaRef,
       }
   
       #[async_trait]
       impl TableProvider for DummyTable {
           fn as_any(&self) -> &dyn Any {
               self
           }
   
           fn schema(&self) -> SchemaRef {
               self.schema.clone()
           }
   
           fn table_type(&self) -> TableType {
               TableType::Base
           }
   
           async fn scan(
               &self,
               _state: &dyn Session,
               _projection: Option<&Vec<usize>>,
               _filters: &[Expr],
               _limit: Option<usize>,
           ) -> Result<Arc<dyn ExecutionPlan>> {
               unimplemented!()
           }
   
           fn supports_filters_pushdown(
               &self,
               filters: &[&Expr],
           ) -> Result<Vec<TableProviderFilterPushDown>> {
               Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
           }
       }
   
       #[derive(Debug, Hash, PartialEq, Eq)]
       pub struct DummyPlan {
           input: Arc<LogicalPlan>,
           schema: DFSchemaRef,
       }
   
       impl PartialOrd for DummyPlan {
           fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
               self.input.partial_cmp(&other.input)
           }
       }
   
       impl UserDefinedLogicalNodeCore for DummyPlan {
           fn name(&self) -> &str {
               "DummyPlan"
           }
   
           fn inputs(&self) -> Vec<&LogicalPlan> {
               vec![&self.input]
           }
   
           fn schema(&self) -> &DFSchemaRef {
               &self.schema
           }
   
           fn expressions(&self) -> Vec<Expr> {
               vec![]
           }
   
           fn with_exprs_and_inputs(
               &self,
               _exprs: Vec<Expr>,
               mut inputs: Vec<LogicalPlan>,
           ) -> Result<Self> {
               Ok(Self {
                   input: inputs.pop().unwrap().into(),
                   schema: self.schema.clone(),
               })
           }
   
           fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> 
std::fmt::Result {
               write!(f, "{}", self.name())
           }
       }
   
       #[tokio::test]
       async fn test_optimize() -> Result<()> {
           let schema = Arc::new(Schema::new(vec![
               Field::new("a", DataType::Int32, true),
               Field::new("b", DataType::Int32, true),
               Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, 
None), true),
           ]));
           let table = Arc::new(DummyTable {
               schema: schema.clone(),
           });
   
           let context = SessionContext::new();
           context.register_table("t", table.clone())?;
   
           let plan = LogicalPlanBuilder::scan("t", provider_as_source(table), 
None)?
               .project(vec![
                   col("a"),
                   col("ts")
                       .cast_to(
                           &DataType::Timestamp(TimeUnit::Millisecond, 
Some("UTC".into())),
                           &schema.clone().to_dfschema()?,
                       )?
                       .alias_qualified(Some("t"), "ts"),
               ])?
               .filter(
                   col("ts")
                       .gt(timestamp_ms(1000))
                       .and(col("ts").lt(timestamp_ms(2000))),
               )?
               .sort(vec![
                   SortExpr::new(col("a"), true, true),
                   SortExpr::new(col("ts"), true, true),
               ])?
               .build()?;
   
           let plan = LogicalPlan::Extension(Extension {
               node: Arc::new(DummyPlan {
                   input: plan.into(),
                   schema: schema.to_dfschema_ref()?,
               }),
           });
   
           assert_eq!(
               plan.display_indent().to_string(),
               r#"DummyPlan
     Sort: t.a ASC NULLS FIRST, t.ts ASC NULLS FIRST
       Filter: t.ts > TimestampMillisecond(1000, Some("UTC")) AND t.ts < 
TimestampMillisecond(2000, Some("UTC"))
         Projection: t.a, CAST(t.ts AS Timestamp(ms, "UTC")) AS ts
           TableScan: t"#
           );
   
           let optimized_plan = context.state().optimize(&plan)?;
           assert_eq!(
               optimized_plan.display_indent().to_string(),
               r#"DummyPlan
     Sort: t.a ASC NULLS FIRST, t.ts ASC NULLS FIRST
       Projection: t.a, CAST(t.ts AS Timestamp(ms, "UTC")) AS ts
         Projection: t.a, t.b, t.ts
           Projection: CAST(t.ts AS Timestamp(ms, "UTC")) AS __common_expr_1, 
t.a, t.b, t.ts
             Projection: t.a, t.b, t.ts
               Projection: CAST(t.ts AS Timestamp(ms, "UTC")) AS 
__common_expr_2, t.a, t.b, t.ts
                 Projection: t.a, t.b, t.ts
                   Filter: __common_expr_3 > TimestampMillisecond(1000, 
Some("UTC")) AND __common_expr_3 < TimestampMillisecond(2000, Some("UTC"))
                     Projection: CAST(t.ts AS Timestamp(ms, "UTC")) AS 
__common_expr_3, t.a, t.b, t.ts
                       TableScan: t, partial_filters=[t.ts > 
TimestampNanosecond(1000000000, None), t.ts < TimestampNanosecond(2000000000, 
None), CAST(t.ts AS Timestamp(ms, "UTC")) > TimestampMillisecond(1000, 
Some("UTC")), CAST(t.ts AS Timestamp(ms, "UTC")) < TimestampMillisecond(2000, 
Some("UTC"))]"#
           );
   
           Ok(())
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to