This is an automated email from the ASF dual-hosted git repository.

berkay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 112cde8e3b ExecutionPlan: add APIs for filter pushdown & optimizer 
rule to apply them (#15566)
112cde8e3b is described below

commit 112cde8e3bedc0e1a56795decf14f08bebf14669
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Thu Apr 17 10:42:41 2025 -0500

    ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them 
(#15566)
    
    * ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them
    
    * wip
    
    * fix tests
    
    * fix
    
    * fix
    
    * fix doc
    
    * fix doc
    
    * Improve doc comments of `filter-pushdown-apis` (#22)
    
    * Improve doc comments
    
    * Apply suggestions from code review
    
    ---------
    
    Co-authored-by: Adrian Garcia Badaracco 
<[email protected]>
    
    * Apply suggestions from code review
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * simplify according to pr feedback
    
    * Add missing file
    
    * Add tests
    
    * pipe config in
    
    * docstrings
    
    * Update datafusion/physical-plan/src/filter_pushdown.rs
    
    * fix
    
    * fix
    
    * fmt
    
    * fix doc
    
    * add example usage of config
    
    * fix test
    
    * convert exec API and optimizer rule
    
    * re-add docs
    
    * dbg
    
    * dbg 2
    
    * avoid clones
    
    * part 3
    
    * fix lint
    
    * tests pass
    
    * Update filter.rs
    
    * update projection tests
    
    * update slt files
    
    * fix
    
    * fix references
    
    * improve impls and update tests
    
    * apply stop logic
    
    * update slt's
    
    * update other tests
    
    * minor
    
    * rename modules to match logical optimizer, tweak docs
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
    Co-authored-by: berkaysynnada <[email protected]>
    Co-authored-by: Berkay Şahin 
<[email protected]>
---
 datafusion/core/tests/physical_optimizer/mod.rs    |   1 +
 .../tests/physical_optimizer/push_down_filter.rs   | 542 +++++++++++++++++++++
 datafusion/datasource/src/file.rs                  |  22 +-
 datafusion/datasource/src/file_scan_config.rs      |  69 ++-
 datafusion/datasource/src/source.rs                |  79 ++-
 datafusion/physical-optimizer/src/lib.rs           |   1 +
 datafusion/physical-optimizer/src/optimizer.rs     |   5 +
 .../physical-optimizer/src/push_down_filter.rs     | 535 ++++++++++++++++++++
 datafusion/physical-plan/src/coalesce_batches.rs   |  15 +
 datafusion/physical-plan/src/execution_plan.rs     |  42 +-
 datafusion/physical-plan/src/filter.rs             |  56 ++-
 datafusion/physical-plan/src/filter_pushdown.rs    |  95 ++++
 datafusion/physical-plan/src/lib.rs                |   1 +
 datafusion/physical-plan/src/repartition/mod.rs    |  15 +
 datafusion/sqllogictest/test_files/aggregate.slt   |   1 -
 datafusion/sqllogictest/test_files/array.slt       |  12 +-
 datafusion/sqllogictest/test_files/cte.slt         |   2 +-
 datafusion/sqllogictest/test_files/dictionary.slt  |   2 +-
 datafusion/sqllogictest/test_files/explain.slt     |   3 +
 .../sqllogictest/test_files/simplify_expr.slt      |   1 -
 20 files changed, 1462 insertions(+), 37 deletions(-)

diff --git a/datafusion/core/tests/physical_optimizer/mod.rs 
b/datafusion/core/tests/physical_optimizer/mod.rs
index 7d5d07715e..6643e7fd59 100644
--- a/datafusion/core/tests/physical_optimizer/mod.rs
+++ b/datafusion/core/tests/physical_optimizer/mod.rs
@@ -25,6 +25,7 @@ mod join_selection;
 mod limit_pushdown;
 mod limited_distinct_aggregation;
 mod projection_pushdown;
+mod push_down_filter;
 mod replace_with_order_preserving_variants;
 mod sanity_checker;
 mod test_utils;
diff --git a/datafusion/core/tests/physical_optimizer/push_down_filter.rs 
b/datafusion/core/tests/physical_optimizer/push_down_filter.rs
new file mode 100644
index 0000000000..b19144f1bc
--- /dev/null
+++ b/datafusion/core/tests/physical_optimizer/push_down_filter.rs
@@ -0,0 +1,542 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::{Arc, OnceLock};
+use std::{
+    any::Any,
+    fmt::{Display, Formatter},
+};
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion::{
+    datasource::object_store::ObjectStoreUrl,
+    logical_expr::Operator,
+    physical_plan::{
+        expressions::{BinaryExpr, Column, Literal},
+        PhysicalExpr,
+    },
+    scalar::ScalarValue,
+};
+use datafusion_common::{config::ConfigOptions, Statistics};
+use datafusion_common::{internal_err, Result};
+use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
+use datafusion_datasource::source::DataSourceExec;
+use datafusion_datasource::{
+    file::FileSource, file_scan_config::FileScanConfig, 
file_stream::FileOpener,
+};
+use datafusion_expr::test::function_stub::count_udaf;
+use datafusion_physical_expr::expressions::col;
+use datafusion_physical_expr::{
+    aggregate::AggregateExprBuilder, conjunction, Partitioning,
+};
+use datafusion_physical_expr_common::physical_expr::fmt_sql;
+use datafusion_physical_optimizer::push_down_filter::PushdownFilter;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::filter_pushdown::{
+    filter_pushdown_not_supported, FilterDescription, FilterPushdownResult,
+    FilterPushdownSupport,
+};
+use datafusion_physical_plan::{
+    aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
+    coalesce_batches::CoalesceBatchesExec,
+    filter::FilterExec,
+    repartition::RepartitionExec,
+};
+use datafusion_physical_plan::{
+    displayable, metrics::ExecutionPlanMetricsSet, DisplayFormatType, 
ExecutionPlan,
+};
+
+use object_store::ObjectStore;
+
+/// A placeholder data source that accepts filter pushdown
+#[derive(Clone, Default)]
+struct TestSource {
+    support: bool,
+    predicate: Option<Arc<dyn PhysicalExpr>>,
+    statistics: Option<Statistics>,
+}
+
+impl TestSource {
+    fn new(support: bool) -> Self {
+        Self {
+            support,
+            predicate: None,
+            statistics: None,
+        }
+    }
+}
+
+impl FileSource for TestSource {
+    fn create_file_opener(
+        &self,
+        _object_store: Arc<dyn ObjectStore>,
+        _base_config: &FileScanConfig,
+        _partition: usize,
+    ) -> Arc<dyn FileOpener> {
+        todo!("should not be called")
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        todo!("should not be called")
+    }
+
+    fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
+        todo!("should not be called")
+    }
+
+    fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
+        todo!("should not be called")
+    }
+
+    fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> 
{
+        todo!("should not be called")
+    }
+
+    fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
+        Arc::new(TestSource {
+            statistics: Some(statistics),
+            ..self.clone()
+        })
+    }
+
+    fn metrics(&self) -> &ExecutionPlanMetricsSet {
+        todo!("should not be called")
+    }
+
+    fn statistics(&self) -> Result<Statistics> {
+        Ok(self
+            .statistics
+            .as_ref()
+            .expect("statistics not set")
+            .clone())
+    }
+
+    fn file_type(&self) -> &str {
+        "test"
+    }
+
+    fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                let support = format!(", pushdown_supported={}", self.support);
+
+                let predicate_string = self
+                    .predicate
+                    .as_ref()
+                    .map(|p| format!(", predicate={p}"))
+                    .unwrap_or_default();
+
+                write!(f, "{}{}", support, predicate_string)
+            }
+            DisplayFormatType::TreeRender => {
+                if let Some(predicate) = &self.predicate {
+                    writeln!(f, "pushdown_supported={}", 
fmt_sql(predicate.as_ref()))?;
+                    writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
+                }
+                Ok(())
+            }
+        }
+    }
+
+    fn try_pushdown_filters(
+        &self,
+        mut fd: FilterDescription,
+        config: &ConfigOptions,
+    ) -> Result<FilterPushdownResult<Arc<dyn FileSource>>> {
+        if self.support && config.execution.parquet.pushdown_filters {
+            if let Some(internal) = self.predicate.as_ref() {
+                fd.filters.push(Arc::clone(internal));
+            }
+            let all_filters = fd.take_description();
+
+            Ok(FilterPushdownResult {
+                support: FilterPushdownSupport::Supported {
+                    child_descriptions: vec![],
+                    op: Arc::new(TestSource {
+                        support: true,
+                        predicate: Some(conjunction(all_filters)),
+                        statistics: self.statistics.clone(), // should be 
updated in reality
+                    }),
+                    revisit: false,
+                },
+                remaining_description: FilterDescription::empty(),
+            })
+        } else {
+            Ok(filter_pushdown_not_supported(fd))
+        }
+    }
+}
+
+fn test_scan(support: bool) -> Arc<dyn ExecutionPlan> {
+    let schema = schema();
+    let source = Arc::new(TestSource::new(support));
+    let base_config = FileScanConfigBuilder::new(
+        ObjectStoreUrl::parse("test://").unwrap(),
+        Arc::clone(schema),
+        source,
+    )
+    .build();
+    DataSourceExec::from_data_source(base_config)
+}
+
+#[test]
+fn test_pushdown_into_scan() {
+    let scan = test_scan(true);
+    let predicate = col_lit_predicate("a", "foo", schema());
+    let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap());
+
+    // expect the predicate to be pushed down into the DataSource
+    insta::assert_snapshot!(
+        OptimizationTest::new(plan, PushdownFilter{}, true),
+        @r"
+    OptimizationTest:
+      input:
+        - FilterExec: a@0 = foo
+        -   DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], 
file_type=test, pushdown_supported=true
+      output:
+        Ok:
+          - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], 
file_type=test, pushdown_supported=true, predicate=a@0 = foo
+    "
+    );
+}
+
+/// Show that we can use config options to determine how to do pushdown.
+#[test]
+fn test_pushdown_into_scan_with_config_options() {
+    let scan = test_scan(true);
+    let predicate = col_lit_predicate("a", "foo", schema());
+    let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap()) as _;
+
+    let mut cfg = ConfigOptions::default();
+    insta::assert_snapshot!(
+        OptimizationTest::new(
+            Arc::clone(&plan),
+            PushdownFilter {},
+            false
+        ),
+        @r"
+    OptimizationTest:
+      input:
+        - FilterExec: a@0 = foo
+        -   DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], 
file_type=test, pushdown_supported=true
+      output:
+        Ok:
+          - FilterExec: a@0 = foo
+          -   DataSourceExec: file_groups={0 groups: []}, projection=[a, b, 
c], file_type=test, pushdown_supported=true
+    "
+    );
+
+    cfg.execution.parquet.pushdown_filters = true;
+    insta::assert_snapshot!(
+        OptimizationTest::new(
+            plan,
+            PushdownFilter {},
+            true
+        ),
+        @r"
+    OptimizationTest:
+      input:
+        - FilterExec: a@0 = foo
+        -   DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], 
file_type=test, pushdown_supported=true
+      output:
+        Ok:
+          - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], 
file_type=test, pushdown_supported=true, predicate=a@0 = foo
+    "
+    );
+}
+
+#[test]
+fn test_filter_collapse() {
+    // filter should be pushed down into the parquet scan with two filters
+    let scan = test_scan(true);
+    let predicate1 = col_lit_predicate("a", "foo", schema());
+    let filter1 = Arc::new(FilterExec::try_new(predicate1, scan).unwrap());
+    let predicate2 = col_lit_predicate("b", "bar", schema());
+    let plan = Arc::new(FilterExec::try_new(predicate2, filter1).unwrap());
+
+    insta::assert_snapshot!(
+        OptimizationTest::new(plan, PushdownFilter{}, true),
+        @r"
+    OptimizationTest:
+      input:
+        - FilterExec: b@1 = bar
+        -   FilterExec: a@0 = foo
+        -     DataSourceExec: file_groups={0 groups: []}, projection=[a, b, 
c], file_type=test, pushdown_supported=true
+      output:
+        Ok:
+          - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], 
file_type=test, pushdown_supported=true, predicate=b@1 = bar AND a@0 = foo
+    "
+    );
+}
+
+#[test]
+fn test_filter_with_projection() {
+    let scan = test_scan(true);
+    let projection = vec![1, 0];
+    let predicate = col_lit_predicate("a", "foo", schema());
+    let plan = Arc::new(
+        FilterExec::try_new(predicate, Arc::clone(&scan))
+            .unwrap()
+            .with_projection(Some(projection))
+            .unwrap(),
+    );
+
+    // expect the predicate to be pushed down into the DataSource but the 
FilterExec to be converted to ProjectionExec
+    insta::assert_snapshot!(
+        OptimizationTest::new(plan, PushdownFilter{}, true),
+        @r"
+    OptimizationTest:
+      input:
+        - FilterExec: a@0 = foo, projection=[b@1, a@0]
+        -   DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], 
file_type=test, pushdown_supported=true
+      output:
+        Ok:
+          - ProjectionExec: expr=[b@1 as b, a@0 as a]
+          -   DataSourceExec: file_groups={0 groups: []}, projection=[a, b, 
c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
+    ",
+    );
+
+    // add a test where the filter is on a column that isn't included in the 
output
+    let projection = vec![1];
+    let predicate = col_lit_predicate("a", "foo", schema());
+    let plan = Arc::new(
+        FilterExec::try_new(predicate, scan)
+            .unwrap()
+            .with_projection(Some(projection))
+            .unwrap(),
+    );
+    insta::assert_snapshot!(
+        OptimizationTest::new(plan, PushdownFilter{},true),
+        @r"
+    OptimizationTest:
+      input:
+        - FilterExec: a@0 = foo, projection=[b@1]
+        -   DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], 
file_type=test, pushdown_supported=true
+      output:
+        Ok:
+          - ProjectionExec: expr=[b@1 as b]
+          -   DataSourceExec: file_groups={0 groups: []}, projection=[a, b, 
c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
+    "
+    );
+}
+
+#[test]
+fn test_push_down_through_transparent_nodes() {
+    // expect the predicate to be pushed down into the DataSource
+    let scan = test_scan(true);
+    let coalesce = Arc::new(CoalesceBatchesExec::new(scan, 1));
+    let predicate = col_lit_predicate("a", "foo", schema());
+    let filter = Arc::new(FilterExec::try_new(predicate, coalesce).unwrap());
+    let repartition = Arc::new(
+        RepartitionExec::try_new(filter, 
Partitioning::RoundRobinBatch(1)).unwrap(),
+    );
+    let predicate = col_lit_predicate("b", "bar", schema());
+    let plan = Arc::new(FilterExec::try_new(predicate, repartition).unwrap());
+
+    // expect the predicate to be pushed down into the DataSource
+    insta::assert_snapshot!(
+        OptimizationTest::new(plan, PushdownFilter{},true),
+        @r"
+    OptimizationTest:
+      input:
+        - FilterExec: b@1 = bar
+        -   RepartitionExec: partitioning=RoundRobinBatch(1), 
input_partitions=0
+        -     FilterExec: a@0 = foo
+        -       CoalesceBatchesExec: target_batch_size=1
+        -         DataSourceExec: file_groups={0 groups: []}, projection=[a, 
b, c], file_type=test, pushdown_supported=true
+      output:
+        Ok:
+          - RepartitionExec: partitioning=RoundRobinBatch(1), 
input_partitions=0
+          -   CoalesceBatchesExec: target_batch_size=1
+          -     DataSourceExec: file_groups={0 groups: []}, projection=[a, b, 
c], file_type=test, pushdown_supported=true, predicate=b@1 = bar AND a@0 = foo
+    "
+    );
+}
+
+#[test]
+fn test_no_pushdown_through_aggregates() {
+    // There are 2 important points here:
+    // 1. The outer filter **is not** pushed down at all because we haven't 
implemented pushdown support
+    //    yet for AggregateExec.
+    // 2. The inner filter **is** pushed down into the DataSource.
+    let scan = test_scan(true);
+
+    let coalesce = Arc::new(CoalesceBatchesExec::new(scan, 10));
+
+    let filter = Arc::new(
+        FilterExec::try_new(col_lit_predicate("a", "foo", schema()), 
coalesce).unwrap(),
+    );
+
+    let aggregate_expr =
+        vec![
+            AggregateExprBuilder::new(count_udaf(), vec![col("a", 
schema()).unwrap()])
+                .schema(Arc::clone(schema()))
+                .alias("cnt")
+                .build()
+                .map(Arc::new)
+                .unwrap(),
+        ];
+    let group_by = PhysicalGroupBy::new_single(vec![
+        (col("a", schema()).unwrap(), "a".to_string()),
+        (col("b", schema()).unwrap(), "b".to_string()),
+    ]);
+    let aggregate = Arc::new(
+        AggregateExec::try_new(
+            AggregateMode::Final,
+            group_by,
+            aggregate_expr.clone(),
+            vec![None],
+            filter,
+            Arc::clone(schema()),
+        )
+        .unwrap(),
+    );
+
+    let coalesce = Arc::new(CoalesceBatchesExec::new(aggregate, 100));
+
+    let predicate = col_lit_predicate("b", "bar", schema());
+    let plan = Arc::new(FilterExec::try_new(predicate, coalesce).unwrap());
+
+    // expect the predicate to be pushed down into the DataSource
+    insta::assert_snapshot!(
+        OptimizationTest::new(plan, PushdownFilter{}, true),
+        @r"
+    OptimizationTest:
+      input:
+        - FilterExec: b@1 = bar
+        -   CoalesceBatchesExec: target_batch_size=100
+        -     AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt], 
ordering_mode=PartiallySorted([0])
+        -       FilterExec: a@0 = foo
+        -         CoalesceBatchesExec: target_batch_size=10
+        -           DataSourceExec: file_groups={0 groups: []}, projection=[a, 
b, c], file_type=test, pushdown_supported=true
+      output:
+        Ok:
+          - FilterExec: b@1 = bar
+          -   CoalesceBatchesExec: target_batch_size=100
+          -     AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt]
+          -       CoalesceBatchesExec: target_batch_size=10
+          -         DataSourceExec: file_groups={0 groups: []}, projection=[a, 
b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
+    "
+    );
+}
+
+/// Schema:
+/// a: String
+/// b: String
+/// c: f64
+static TEST_SCHEMA: OnceLock<SchemaRef> = OnceLock::new();
+
+fn schema() -> &'static SchemaRef {
+    TEST_SCHEMA.get_or_init(|| {
+        let fields = vec![
+            Field::new("a", DataType::Utf8, false),
+            Field::new("b", DataType::Utf8, false),
+            Field::new("c", DataType::Float64, false),
+        ];
+        Arc::new(Schema::new(fields))
+    })
+}
+
+/// Returns a predicate that is a binary expression col = lit
+fn col_lit_predicate(
+    column_name: &str,
+    scalar_value: impl Into<ScalarValue>,
+    schema: &Schema,
+) -> Arc<dyn PhysicalExpr> {
+    let scalar_value = scalar_value.into();
+    Arc::new(BinaryExpr::new(
+        Arc::new(Column::new_with_schema(column_name, schema).unwrap()),
+        Operator::Eq,
+        Arc::new(Literal::new(scalar_value)),
+    ))
+}
+
+/// A harness for testing physical optimizers.
+///
+/// You can use this to test the output of a physical optimizer rule using 
insta snapshots
+#[derive(Debug)]
+pub struct OptimizationTest {
+    input: Vec<String>,
+    output: Result<Vec<String>, String>,
+}
+
+impl OptimizationTest {
+    pub fn new<O>(
+        input_plan: Arc<dyn ExecutionPlan>,
+        opt: O,
+        allow_pushdown_filters: bool,
+    ) -> Self
+    where
+        O: PhysicalOptimizerRule,
+    {
+        let mut parquet_pushdown_config = ConfigOptions::default();
+        parquet_pushdown_config.execution.parquet.pushdown_filters =
+            allow_pushdown_filters;
+
+        let input = format_execution_plan(&input_plan);
+        let input_schema = input_plan.schema();
+
+        let output_result = opt.optimize(input_plan, &parquet_pushdown_config);
+        let output = output_result
+            .and_then(|plan| {
+                if opt.schema_check() && (plan.schema() != input_schema) {
+                    internal_err!(
+                        "Schema mismatch:\n\nBefore:\n{:?}\n\nAfter:\n{:?}",
+                        input_schema,
+                        plan.schema()
+                    )
+                } else {
+                    Ok(plan)
+                }
+            })
+            .map(|plan| format_execution_plan(&plan))
+            .map_err(|e| e.to_string());
+
+        Self { input, output }
+    }
+}
+
+impl Display for OptimizationTest {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        writeln!(f, "OptimizationTest:")?;
+        writeln!(f, "  input:")?;
+        for line in &self.input {
+            writeln!(f, "    - {line}")?;
+        }
+        writeln!(f, "  output:")?;
+        match &self.output {
+            Ok(output) => {
+                writeln!(f, "    Ok:")?;
+                for line in output {
+                    writeln!(f, "      - {line}")?;
+                }
+            }
+            Err(err) => {
+                writeln!(f, "    Err: {err}")?;
+            }
+        }
+        Ok(())
+    }
+}
+
+pub fn format_execution_plan(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
+    format_lines(&displayable(plan.as_ref()).indent(false).to_string())
+}
+
+fn format_lines(s: &str) -> Vec<String> {
+    s.trim().split('\n').map(|s| s.to_string()).collect()
+}
diff --git a/datafusion/datasource/src/file.rs 
b/datafusion/datasource/src/file.rs
index 0066f39801..835285b21e 100644
--- a/datafusion/datasource/src/file.rs
+++ b/datafusion/datasource/src/file.rs
@@ -26,8 +26,12 @@ use crate::file_groups::FileGroupPartitioner;
 use crate::file_scan_config::FileScanConfig;
 use crate::file_stream::FileOpener;
 use arrow::datatypes::SchemaRef;
-use datafusion_common::Statistics;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::{Result, Statistics};
 use datafusion_physical_expr::LexOrdering;
+use datafusion_physical_plan::filter_pushdown::{
+    filter_pushdown_not_supported, FilterDescription, FilterPushdownResult,
+};
 use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
 use datafusion_physical_plan::DisplayFormatType;
 
@@ -57,7 +61,7 @@ pub trait FileSource: Send + Sync {
     /// Return execution plan metrics
     fn metrics(&self) -> &ExecutionPlanMetricsSet;
     /// Return projected statistics
-    fn statistics(&self) -> datafusion_common::Result<Statistics>;
+    fn statistics(&self) -> Result<Statistics>;
     /// String representation of file source such as "csv", "json", "parquet"
     fn file_type(&self) -> &str;
     /// Format FileType specific information
@@ -75,7 +79,7 @@ pub trait FileSource: Send + Sync {
         repartition_file_min_size: usize,
         output_ordering: Option<LexOrdering>,
         config: &FileScanConfig,
-    ) -> datafusion_common::Result<Option<FileScanConfig>> {
+    ) -> Result<Option<FileScanConfig>> {
         if config.file_compression_type.is_compressed() || 
config.new_lines_in_values {
             return Ok(None);
         }
@@ -93,4 +97,16 @@ pub trait FileSource: Send + Sync {
         }
         Ok(None)
     }
+
+    /// Try to push down filters into this FileSource.
+    /// See [`ExecutionPlan::try_pushdown_filters`] for more details.
+    ///
+    /// [`ExecutionPlan::try_pushdown_filters`]: 
datafusion_physical_plan::ExecutionPlan::try_pushdown_filters
+    fn try_pushdown_filters(
+        &self,
+        fd: FilterDescription,
+        _config: &ConfigOptions,
+    ) -> Result<FilterPushdownResult<Arc<dyn FileSource>>> {
+        Ok(filter_pushdown_not_supported(fd))
+    }
 }
diff --git a/datafusion/datasource/src/file_scan_config.rs 
b/datafusion/datasource/src/file_scan_config.rs
index b20f621474..fb756cc11f 100644
--- a/datafusion/datasource/src/file_scan_config.rs
+++ b/datafusion/datasource/src/file_scan_config.rs
@@ -23,6 +23,16 @@ use std::{
     fmt::Result as FmtResult, marker::PhantomData, sync::Arc,
 };
 
+use crate::file_groups::FileGroup;
+use crate::{
+    display::FileGroupsDisplay,
+    file::FileSource,
+    file_compression_type::FileCompressionType,
+    file_stream::FileStream,
+    source::{DataSource, DataSourceExec},
+    statistics::MinMaxStatistics,
+    PartitionedFile,
+};
 use arrow::{
     array::{
         ArrayData, ArrayRef, BufferBuilder, DictionaryArray, RecordBatch,
@@ -31,7 +41,9 @@ use arrow::{
     buffer::Buffer,
     datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, 
UInt16Type},
 };
-use datafusion_common::{exec_err, ColumnStatistics, Constraints, Result, 
Statistics};
+use datafusion_common::{
+    config::ConfigOptions, exec_err, ColumnStatistics, Constraints, Result, 
Statistics,
+};
 use datafusion_common::{DataFusionError, ScalarValue};
 use datafusion_execution::{
     object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
@@ -40,6 +52,10 @@ use datafusion_physical_expr::{
     expressions::Column, EquivalenceProperties, LexOrdering, Partitioning,
     PhysicalSortExpr,
 };
+use datafusion_physical_plan::filter_pushdown::{
+    filter_pushdown_not_supported, FilterDescription, FilterPushdownResult,
+    FilterPushdownSupport,
+};
 use datafusion_physical_plan::{
     display::{display_orderings, ProjectSchemaDisplay},
     metrics::ExecutionPlanMetricsSet,
@@ -48,17 +64,6 @@ use datafusion_physical_plan::{
 };
 use log::{debug, warn};
 
-use crate::file_groups::FileGroup;
-use crate::{
-    display::FileGroupsDisplay,
-    file::FileSource,
-    file_compression_type::FileCompressionType,
-    file_stream::FileStream,
-    source::{DataSource, DataSourceExec},
-    statistics::MinMaxStatistics,
-    PartitionedFile,
-};
-
 /// The base configurations for a [`DataSourceExec`], the a physical plan for
 /// any given file format.
 ///
@@ -588,6 +593,46 @@ impl DataSource for FileScanConfig {
             ) as _
         }))
     }
+
+    fn try_pushdown_filters(
+        &self,
+        fd: FilterDescription,
+        config: &ConfigOptions,
+    ) -> Result<FilterPushdownResult<Arc<dyn DataSource>>> {
+        let FilterPushdownResult {
+            support,
+            remaining_description,
+        } = self.file_source.try_pushdown_filters(fd, config)?;
+
+        match support {
+            FilterPushdownSupport::Supported {
+                child_descriptions,
+                op,
+                revisit,
+            } => {
+                let new_data_source = Arc::new(
+                    FileScanConfigBuilder::from(self.clone())
+                        .with_source(op)
+                        .build(),
+                );
+
+                debug_assert!(child_descriptions.is_empty());
+                debug_assert!(!revisit);
+
+                Ok(FilterPushdownResult {
+                    support: FilterPushdownSupport::Supported {
+                        child_descriptions,
+                        op: new_data_source,
+                        revisit,
+                    },
+                    remaining_description,
+                })
+            }
+            FilterPushdownSupport::NotSupported => {
+                Ok(filter_pushdown_not_supported(remaining_description))
+            }
+        }
+    }
 }
 
 impl FileScanConfig {
diff --git a/datafusion/datasource/src/source.rs 
b/datafusion/datasource/src/source.rs
index 6c9122ce1a..2d6ea1a8b3 100644
--- a/datafusion/datasource/src/source.rs
+++ b/datafusion/datasource/src/source.rs
@@ -31,10 +31,14 @@ use datafusion_physical_plan::{
 
 use crate::file_scan_config::FileScanConfig;
 use datafusion_common::config::ConfigOptions;
-use datafusion_common::{Constraints, Statistics};
+use datafusion_common::{Constraints, Result, Statistics};
 use datafusion_execution::{SendableRecordBatchStream, TaskContext};
 use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
 use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use datafusion_physical_plan::filter_pushdown::{
+    filter_pushdown_not_supported, FilterDescription, FilterPushdownResult,
+    FilterPushdownSupport,
+};
 
 /// Common behaviors in Data Sources for both from Files and Memory.
 ///
@@ -51,7 +55,7 @@ pub trait DataSource: Send + Sync + Debug {
         &self,
         partition: usize,
         context: Arc<TaskContext>,
-    ) -> datafusion_common::Result<SendableRecordBatchStream>;
+    ) -> Result<SendableRecordBatchStream>;
     fn as_any(&self) -> &dyn Any;
     /// Format this source for display in explain plans
     fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
@@ -62,13 +66,13 @@ pub trait DataSource: Send + Sync + Debug {
         _target_partitions: usize,
         _repartition_file_min_size: usize,
         _output_ordering: Option<LexOrdering>,
-    ) -> datafusion_common::Result<Option<Arc<dyn DataSource>>> {
+    ) -> Result<Option<Arc<dyn DataSource>>> {
         Ok(None)
     }
 
     fn output_partitioning(&self) -> Partitioning;
     fn eq_properties(&self) -> EquivalenceProperties;
-    fn statistics(&self) -> datafusion_common::Result<Statistics>;
+    fn statistics(&self) -> Result<Statistics>;
     /// Return a copy of this DataSource with a new fetch limit
     fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
     fn fetch(&self) -> Option<usize>;
@@ -78,7 +82,16 @@ pub trait DataSource: Send + Sync + Debug {
     fn try_swapping_with_projection(
         &self,
         _projection: &ProjectionExec,
-    ) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>>;
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>>;
+    /// Try to push down filters into this DataSource.
+    /// See [`ExecutionPlan::try_pushdown_filters`] for more details.
+    fn try_pushdown_filters(
+        &self,
+        fd: FilterDescription,
+        _config: &ConfigOptions,
+    ) -> Result<FilterPushdownResult<Arc<dyn DataSource>>> {
+        Ok(filter_pushdown_not_supported(fd))
+    }
 }
 
 /// [`ExecutionPlan`] handles different file formats like JSON, CSV, AVRO, 
ARROW, PARQUET
@@ -131,7 +144,7 @@ impl ExecutionPlan for DataSourceExec {
     fn with_new_children(
         self: Arc<Self>,
         _: Vec<Arc<dyn ExecutionPlan>>,
-    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
+    ) -> Result<Arc<dyn ExecutionPlan>> {
         Ok(self)
     }
 
@@ -139,7 +152,7 @@ impl ExecutionPlan for DataSourceExec {
         &self,
         target_partitions: usize,
         config: &ConfigOptions,
-    ) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>> {
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
         let data_source = self.data_source.repartitioned(
             target_partitions,
             config.optimizer.repartition_file_min_size,
@@ -163,7 +176,7 @@ impl ExecutionPlan for DataSourceExec {
         &self,
         partition: usize,
         context: Arc<TaskContext>,
-    ) -> datafusion_common::Result<SendableRecordBatchStream> {
+    ) -> Result<SendableRecordBatchStream> {
         self.data_source.open(partition, context)
     }
 
@@ -171,7 +184,7 @@ impl ExecutionPlan for DataSourceExec {
         Some(self.data_source.metrics().clone_inner())
     }
 
-    fn statistics(&self) -> datafusion_common::Result<Statistics> {
+    fn statistics(&self) -> Result<Statistics> {
         self.data_source.statistics()
     }
 
@@ -189,9 +202,45 @@ impl ExecutionPlan for DataSourceExec {
     fn try_swapping_with_projection(
         &self,
         projection: &ProjectionExec,
-    ) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>> {
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
         self.data_source.try_swapping_with_projection(projection)
     }
+
+    fn try_pushdown_filters(
+        &self,
+        fd: FilterDescription,
+        config: &ConfigOptions,
+    ) -> Result<FilterPushdownResult<Arc<dyn ExecutionPlan>>> {
+        let FilterPushdownResult {
+            support,
+            remaining_description,
+        } = self.data_source.try_pushdown_filters(fd, config)?;
+
+        match support {
+            FilterPushdownSupport::Supported {
+                child_descriptions,
+                op,
+                revisit,
+            } => {
+                let new_exec = Arc::new(DataSourceExec::new(op));
+
+                debug_assert!(child_descriptions.is_empty());
+                debug_assert!(!revisit);
+
+                Ok(FilterPushdownResult {
+                    support: FilterPushdownSupport::Supported {
+                        child_descriptions,
+                        op: new_exec,
+                        revisit,
+                    },
+                    remaining_description,
+                })
+            }
+            FilterPushdownSupport::NotSupported => {
+                Ok(filter_pushdown_not_supported(remaining_description))
+            }
+        }
+    }
 }
 
 impl DataSourceExec {
@@ -254,3 +303,13 @@ impl DataSourceExec {
             })
     }
 }
+
+/// Create a new `DataSourceExec` from a `DataSource`
+impl<S> From<S> for DataSourceExec
+where
+    S: DataSource + 'static,
+{
+    fn from(source: S) -> Self {
+        Self::new(Arc::new(source))
+    }
+}
diff --git a/datafusion/physical-optimizer/src/lib.rs 
b/datafusion/physical-optimizer/src/lib.rs
index 35503f3b0b..57dac21b6e 100644
--- a/datafusion/physical-optimizer/src/lib.rs
+++ b/datafusion/physical-optimizer/src/lib.rs
@@ -36,6 +36,7 @@ pub mod optimizer;
 pub mod output_requirements;
 pub mod projection_pushdown;
 pub mod pruning;
+pub mod push_down_filter;
 pub mod sanity_checker;
 pub mod topk_aggregation;
 pub mod update_aggr_exprs;
diff --git a/datafusion/physical-optimizer/src/optimizer.rs 
b/datafusion/physical-optimizer/src/optimizer.rs
index bab31150e2..d4ff7d6b9e 100644
--- a/datafusion/physical-optimizer/src/optimizer.rs
+++ b/datafusion/physical-optimizer/src/optimizer.rs
@@ -30,6 +30,7 @@ use crate::limit_pushdown::LimitPushdown;
 use crate::limited_distinct_aggregation::LimitedDistinctAggregation;
 use crate::output_requirements::OutputRequirements;
 use crate::projection_pushdown::ProjectionPushdown;
+use crate::push_down_filter::PushdownFilter;
 use crate::sanity_checker::SanityCheckPlan;
 use crate::topk_aggregation::TopKAggregation;
 use crate::update_aggr_exprs::OptimizeAggregateOrder;
@@ -121,6 +122,10 @@ impl PhysicalOptimizer {
             // into an `order by max(x) limit y`. In this case it will copy 
the limit value down
             // to the aggregation, allowing it to use only y number of 
accumulators.
             Arc::new(TopKAggregation::new()),
+            // The FilterPushdown rule tries to push down filters as far as it 
can.
+            // For example, it will push down filtering from a `FilterExec` to
+            // a `DataSourceExec`, or from a `TopK`'s current state to a 
`DataSourceExec`.
+            Arc::new(PushdownFilter::new()),
             // The LimitPushdown rule tries to push limits down as far as 
possible,
             // replacing operators with fetching variants, or adding limits
             // past operators that support limit pushdown.
diff --git a/datafusion/physical-optimizer/src/push_down_filter.rs 
b/datafusion/physical-optimizer/src/push_down_filter.rs
new file mode 100644
index 0000000000..80201454d0
--- /dev/null
+++ b/datafusion/physical-optimizer/src/push_down_filter.rs
@@ -0,0 +1,535 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use crate::PhysicalOptimizerRule;
+
+use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
+use datafusion_common::{config::ConfigOptions, Result};
+use datafusion_physical_expr::conjunction;
+use datafusion_physical_plan::filter::FilterExec;
+use datafusion_physical_plan::filter_pushdown::{
+    FilterDescription, FilterPushdownResult, FilterPushdownSupport,
+};
+use datafusion_physical_plan::tree_node::PlanContext;
+use datafusion_physical_plan::ExecutionPlan;
+
+/// Attempts to recursively push given filters from the top of the tree into 
leafs.
+///
+/// # Default Implementation
+///
+/// The default implementation in [`ExecutionPlan::try_pushdown_filters`] is a 
no-op
+/// that assumes that:
+///
+/// * Parent filters can't be passed onto children.
+/// * This node has no filters to contribute.
+///
+/// # Example: Push filter into a `DataSourceExec`
+///
+/// For example, consider the following plan:
+///
+/// ```text
+/// ┌──────────────────────┐
+/// │ CoalesceBatchesExec  │
+/// └──────────────────────┘
+///             │
+///             ▼
+/// ┌──────────────────────┐
+/// │      FilterExec      │
+/// │  filters = [ id=1]   │
+/// └──────────────────────┘
+///             │
+///             ▼
+/// ┌──────────────────────┐
+/// │    DataSourceExec    │
+/// │    projection = *    │
+/// └──────────────────────┘
+/// ```
+///
+/// Our goal is to move the `id = 1` filter from the [`FilterExec`] node to 
the `DataSourceExec` node.
+///
+/// If this filter is selective pushing it into the scan can avoid massive
+/// amounts of data being read from the source (the projection is `*` so all
+/// matching columns are read).
+///
+/// The new plan looks like:
+///
+/// ```text
+/// ┌──────────────────────┐
+/// │ CoalesceBatchesExec  │
+/// └──────────────────────┘
+///           │
+///           ▼
+/// ┌──────────────────────┐
+/// │    DataSourceExec    │
+/// │    projection = *    │
+/// │   filters = [ id=1]  │
+/// └──────────────────────┘
+/// ```
+///
+/// # Example: Push filters with `ProjectionExec`
+///
+/// Let's consider a more complex example involving a [`ProjectionExec`]
+/// node in between the [`FilterExec`] and `DataSourceExec` nodes that
+/// creates a new column that the filter depends on.
+///
+/// ```text
+/// ┌──────────────────────┐
+/// │ CoalesceBatchesExec  │
+/// └──────────────────────┘
+///             │
+///             ▼
+/// ┌──────────────────────┐
+/// │      FilterExec      │
+/// │    filters =         │
+/// │     [cost>50,id=1]   │
+/// └──────────────────────┘
+///             │
+///             ▼
+/// ┌──────────────────────┐
+/// │    ProjectionExec    │
+/// │ cost = price * 1.2   │
+/// └──────────────────────┘
+///             │
+///             ▼
+/// ┌──────────────────────┐
+/// │    DataSourceExec    │
+/// │    projection = *    │
+/// └──────────────────────┘
+/// ```
+///
+/// We want to push down the filters `[id=1]` to the `DataSourceExec` node,
+/// but can't push down `cost>50` because it requires the [`ProjectionExec`]
+/// node to be executed first. A simple thing to do would be to split up the
+/// filter into two separate filters and push down the first one:
+///
+/// ```text
+/// ┌──────────────────────┐
+/// │ CoalesceBatchesExec  │
+/// └──────────────────────┘
+///             │
+///             ▼
+/// ┌──────────────────────┐
+/// │      FilterExec      │
+/// │    filters =         │
+/// │     [cost>50]        │
+/// └──────────────────────┘
+///             │
+///             ▼
+/// ┌──────────────────────┐
+/// │    ProjectionExec    │
+/// │ cost = price * 1.2   │
+/// └──────────────────────┘
+///             │
+///             ▼
+/// ┌──────────────────────┐
+/// │    DataSourceExec    │
+/// │    projection = *    │
+/// │   filters = [ id=1]  │
+/// └──────────────────────┘
+/// ```
+///
+/// We can actually however do better by pushing down `price * 1.2 > 50`
+/// instead of `cost > 50`:
+///
+/// ```text
+/// ┌──────────────────────┐
+/// │ CoalesceBatchesExec  │
+/// └──────────────────────┘
+///            │
+///            ▼
+/// ┌──────────────────────┐
+/// │    ProjectionExec    │
+/// │ cost = price * 1.2   │
+/// └──────────────────────┘
+///            │
+///            ▼
+/// ┌──────────────────────┐
+/// │    DataSourceExec    │
+/// │    projection = *    │
+/// │   filters = [id=1,   │
+/// │   price * 1.2 > 50]  │
+/// └──────────────────────┘
+/// ```
+///
+/// # Example: Push filters within a subtree
+///
+/// There are also cases where we may be able to push down filters within a
+/// subtree but not the entire tree. A good example of this is aggregation
+/// nodes:
+///
+/// ```text
+/// ┌──────────────────────┐
+/// │ ProjectionExec       │
+/// │ projection = *       │
+/// └──────────────────────┘
+///           │
+///           ▼
+/// ┌──────────────────────┐
+/// │ FilterExec           │
+/// │ filters = [sum > 10] │
+/// └──────────────────────┘
+///           │
+///           ▼
+/// ┌───────────────────────┐
+/// │     AggregateExec     │
+/// │    group by = [id]    │
+/// │    aggregate =        │
+/// │      [sum(price)]     │
+/// └───────────────────────┘
+///           │
+///           ▼
+/// ┌──────────────────────┐
+/// │ FilterExec           │
+/// │ filters = [id=1]     │
+/// └──────────────────────┘
+///          │
+///          ▼
+/// ┌──────────────────────┐
+/// │ DataSourceExec       │
+/// │ projection = *       │
+/// └──────────────────────┘
+/// ```
+///
+/// The transformation here is to push down the `id=1` filter to the
+/// `DataSourceExec` node:
+///
+/// ```text
+/// ┌──────────────────────┐
+/// │ ProjectionExec       │
+/// │ projection = *       │
+/// └──────────────────────┘
+///           │
+///           ▼
+/// ┌──────────────────────┐
+/// │ FilterExec           │
+/// │ filters = [sum > 10] │
+/// └──────────────────────┘
+///           │
+///           ▼
+/// ┌───────────────────────┐
+/// │     AggregateExec     │
+/// │    group by = [id]    │
+/// │    aggregate =        │
+/// │      [sum(price)]     │
+/// └───────────────────────┘
+///           │
+///           ▼
+/// ┌──────────────────────┐
+/// │ DataSourceExec       │
+/// │ projection = *       │
+/// │ filters = [id=1]     │
+/// └──────────────────────┘
+/// ```
+///
+/// The point here is that:
+/// 1. We cannot push down `sum > 10` through the [`AggregateExec`] node into 
the `DataSourceExec` node.
+///    Any filters above the [`AggregateExec`] node are not pushed down.
+///    This is determined by calling [`ExecutionPlan::try_pushdown_filters`] 
on the [`AggregateExec`] node.
+/// 2. We need to keep recursing into the tree so that we can discover the 
other [`FilterExec`] node and push
+///    down the `id=1` filter.
+///
+/// # Example: Push filters through Joins
+///
+/// It is also possible to push down filters through joins and filters that
+/// originate from joins. For example, a hash join where we build a hash
+/// table of the left side and probe the right side (ignoring why we would
+/// choose this order, typically it depends on the size of each table,
+/// etc.).
+///
+/// ```text
+///              ┌─────────────────────┐
+///              │     FilterExec      │
+///              │ filters =           │
+///              │  [d.size > 100]     │
+///              └─────────────────────┘
+///                         │
+///                         │
+///              ┌──────────▼──────────┐
+///              │                     │
+///              │    HashJoinExec     │
+///              │ [u.dept@hash(d.id)] │
+///              │                     │
+///              └─────────────────────┘
+///                         │
+///            ┌────────────┴────────────┐
+/// ┌──────────▼──────────┐   ┌──────────▼──────────┐
+/// │   DataSourceExec    │   │   DataSourceExec    │
+/// │  alias [users as u] │   │  alias [dept as d]  │
+/// │                     │   │                     │
+/// └─────────────────────┘   └─────────────────────┘
+/// ```
+///
+/// There are two pushdowns we can do here:
+/// 1. Push down the `d.size > 100` filter through the `HashJoinExec` node to 
the `DataSourceExec`
+///    node for the `departments` table.
+/// 2. Push down the hash table state from the `HashJoinExec` node to the 
`DataSourceExec` node to avoid reading
+///    rows from the `users` table that will be eliminated by the join.
+///    This can be done via a bloom filter or similar and is not (yet) 
supported
+///    in DataFusion. See <https://github.com/apache/datafusion/issues/7955>.
+///
+/// ```text
+///              ┌─────────────────────┐
+///              │                     │
+///              │    HashJoinExec     │
+///              │ [u.dept@hash(d.id)] │
+///              │                     │
+///              └─────────────────────┘
+///                         │
+///            ┌────────────┴────────────┐
+/// ┌──────────▼──────────┐   ┌──────────▼──────────┐
+/// │   DataSourceExec    │   │   DataSourceExec    │
+/// │  alias [users as u] │   │  alias [dept as d]  │
+/// │ filters =           │   │  filters =          │
+/// │   [depg@hash(d.id)] │   │    [ d.size > 100]  │
+/// └─────────────────────┘   └─────────────────────┘
+/// ```
+///
+/// You may notice in this case that the filter is *dynamic*: the hash table
+/// is built _after_ the `departments` table is read and at runtime. We
+/// don't have a concrete `InList` filter or similar to push down at
+/// optimization time. These sorts of dynamic filters are handled by
+/// building a specialized [`PhysicalExpr`] that can be evaluated at runtime
+/// and internally maintains a reference to the hash table or other state.
+///
+/// To make working with these sorts of dynamic filters more tractable we have 
the method [`PhysicalExpr::snapshot`]
+/// which attempts to simplify a dynamic filter into a "basic" non-dynamic 
filter.
+/// For a join this could mean converting it to an `InList` filter or a 
min/max filter for example.
+/// See `datafusion/physical-plan/src/dynamic_filters.rs` for more details.
+///
+/// # Example: Push TopK filters into Scans
+///
+/// Another form of dynamic filter is pushing down the state of a `TopK`
+/// operator for queries like `SELECT * FROM t ORDER BY id LIMIT 10`:
+///
+/// ```text
+/// ┌──────────────────────┐
+/// │       TopK           │
+/// │     limit = 10       │
+/// │   order by = [id]    │
+/// └──────────────────────┘
+///            │
+///            ▼
+/// ┌──────────────────────┐
+/// │    DataSourceExec    │
+/// │    projection = *    │
+/// └──────────────────────┘
+/// ```
+///
+/// We can avoid large amounts of data processing by transforming this into:
+///
+/// ```text
+/// ┌──────────────────────┐
+/// │       TopK           │
+/// │     limit = 10       │
+/// │   order by = [id]    │
+/// └──────────────────────┘
+///            │
+///            ▼
+/// ┌──────────────────────┐
+/// │    DataSourceExec    │
+/// │    projection = *    │
+/// │ filters =            │
+/// │    [id < @ TopKHeap] │
+/// └──────────────────────┘
+/// ```
+///
+/// Now as we fill our `TopK` heap we can push down the state of the heap to
+/// the `DataSourceExec` node to avoid reading files / row groups / pages /
+/// rows that could not possibly be in the top 10.
+///
+/// This is not yet implemented in DataFusion. See
+/// <https://github.com/apache/datafusion/issues/15037>
+///
+/// [`PhysicalExpr`]: datafusion_physical_plan::PhysicalExpr
+/// [`PhysicalExpr::snapshot`]: 
datafusion_physical_plan::PhysicalExpr::snapshot
+/// [`FilterExec`]: datafusion_physical_plan::filter::FilterExec
+/// [`ProjectionExec`]: datafusion_physical_plan::projection::ProjectionExec
+/// [`AggregateExec`]: datafusion_physical_plan::aggregates::AggregateExec
+#[derive(Debug)]
+pub struct PushdownFilter {}
+
+impl Default for PushdownFilter {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+pub type FilterDescriptionContext = PlanContext<FilterDescription>;
+
+impl PhysicalOptimizerRule for PushdownFilter {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        config: &ConfigOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let context = FilterDescriptionContext::new_default(plan);
+
+        context
+            .transform_up(|node| {
+                if node.plan.as_any().downcast_ref::<FilterExec>().is_some() {
+                    let initial_plan = Arc::clone(&node.plan);
+                    let mut accept_updated = false;
+                    let updated_node = node.transform_down(|filter_node| {
+                        Self::try_pushdown(filter_node, config, &mut 
accept_updated)
+                    });
+
+                    if accept_updated {
+                        updated_node
+                    } else {
+                        
Ok(Transformed::no(FilterDescriptionContext::new_default(
+                            initial_plan,
+                        )))
+                    }
+                }
+                // Other filter introducing operators extends here
+                else {
+                    Ok(Transformed::no(node))
+                }
+            })
+            .map(|updated| updated.data.plan)
+    }
+
+    fn name(&self) -> &str {
+        "PushdownFilter"
+    }
+
+    fn schema_check(&self) -> bool {
+        true // Filter pushdown does not change the schema of the plan
+    }
+}
+
+impl PushdownFilter {
+    pub fn new() -> Self {
+        Self {}
+    }
+
+    fn try_pushdown(
+        mut node: FilterDescriptionContext,
+        config: &ConfigOptions,
+        accept_updated: &mut bool,
+    ) -> Result<Transformed<FilterDescriptionContext>> {
+        let initial_description = FilterDescription {
+            filters: node.data.take_description(),
+        };
+
+        let FilterPushdownResult {
+            support,
+            remaining_description,
+        } = node
+            .plan
+            .try_pushdown_filters(initial_description, config)?;
+
+        match support {
+            FilterPushdownSupport::Supported {
+                mut child_descriptions,
+                op,
+                revisit,
+            } => {
+                if revisit {
+                    // This check handles cases where the current operator is 
entirely removed
+                    // from the plan and replaced with its child. In such 
cases, to not skip
+                    // over the new node, we need to explicitly re-apply this 
pushdown logic
+                    // to the new node.
+                    //
+                    // TODO: If TreeNodeRecursion supports a Revisit mechanism 
in the future,
+                    //       this manual recursion could be removed.
+
+                    // If the operator is removed, it should not leave any 
filters as remaining
+                    debug_assert!(remaining_description.filters.is_empty());
+                    // Operators having 2 children cannot be removed
+                    debug_assert_eq!(child_descriptions.len(), 1);
+                    debug_assert_eq!(node.children.len(), 1);
+
+                    node.plan = op;
+                    node.data = child_descriptions.swap_remove(0);
+                    node.children = node.children.swap_remove(0).children;
+                    Self::try_pushdown(node, config, accept_updated)
+                } else {
+                    if remaining_description.filters.is_empty() {
+                        // Filter can be pushed down safely
+                        node.plan = op;
+                        if node.children.is_empty() {
+                            *accept_updated = true;
+                        } else {
+                            for (child, descr) in
+                                
node.children.iter_mut().zip(child_descriptions)
+                            {
+                                child.data = descr;
+                            }
+                        }
+                    } else {
+                        // Filter cannot be pushed down
+                        node = insert_filter_exec(
+                            node,
+                            child_descriptions,
+                            remaining_description,
+                        )?;
+                    }
+                    Ok(Transformed::yes(node))
+                }
+            }
+            FilterPushdownSupport::NotSupported => {
+                if remaining_description.filters.is_empty() {
+                    Ok(Transformed {
+                        data: node,
+                        transformed: false,
+                        tnr: TreeNodeRecursion::Stop,
+                    })
+                } else {
+                    node = insert_filter_exec(
+                        node,
+                        vec![FilterDescription::empty(); 1],
+                        remaining_description,
+                    )?;
+                    Ok(Transformed {
+                        data: node,
+                        transformed: true,
+                        tnr: TreeNodeRecursion::Stop,
+                    })
+                }
+            }
+        }
+    }
+}
+
+fn insert_filter_exec(
+    node: FilterDescriptionContext,
+    mut child_descriptions: Vec<FilterDescription>,
+    remaining_description: FilterDescription,
+) -> Result<FilterDescriptionContext> {
+    let mut new_child_node = node;
+
+    // Filter has one child
+    if !child_descriptions.is_empty() {
+        debug_assert_eq!(child_descriptions.len(), 1);
+        new_child_node.data = child_descriptions.swap_remove(0);
+    }
+    let new_plan = Arc::new(FilterExec::try_new(
+        conjunction(remaining_description.filters),
+        Arc::clone(&new_child_node.plan),
+    )?);
+    let new_children = vec![new_child_node];
+    let new_data = FilterDescription::empty();
+
+    Ok(FilterDescriptionContext::new(
+        new_plan,
+        new_data,
+        new_children,
+    ))
+}
diff --git a/datafusion/physical-plan/src/coalesce_batches.rs 
b/datafusion/physical-plan/src/coalesce_batches.rs
index 5244038b9a..faab5fdc5e 100644
--- a/datafusion/physical-plan/src/coalesce_batches.rs
+++ b/datafusion/physical-plan/src/coalesce_batches.rs
@@ -35,6 +35,10 @@ use datafusion_execution::TaskContext;
 
 use crate::coalesce::{BatchCoalescer, CoalescerState};
 use crate::execution_plan::CardinalityEffect;
+use crate::filter_pushdown::{
+    filter_pushdown_transparent, FilterDescription, FilterPushdownResult,
+};
+use datafusion_common::config::ConfigOptions;
 use futures::ready;
 use futures::stream::{Stream, StreamExt};
 
@@ -212,6 +216,17 @@ impl ExecutionPlan for CoalesceBatchesExec {
     fn cardinality_effect(&self) -> CardinalityEffect {
         CardinalityEffect::Equal
     }
+
+    fn try_pushdown_filters(
+        &self,
+        fd: FilterDescription,
+        _config: &ConfigOptions,
+    ) -> Result<FilterPushdownResult<Arc<dyn ExecutionPlan>>> {
+        Ok(filter_pushdown_transparent::<Arc<dyn ExecutionPlan>>(
+            Arc::new(self.clone()),
+            fd,
+        ))
+    }
 }
 
 /// Stream for [`CoalesceBatchesExec`]. See [`CoalesceBatchesExec`] for more 
details.
diff --git a/datafusion/physical-plan/src/execution_plan.rs 
b/datafusion/physical-plan/src/execution_plan.rs
index 2bc5706ee0..2b6eac7be0 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -16,6 +16,9 @@
 // under the License.
 
 pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, 
VerboseDisplay};
+use crate::filter_pushdown::{
+    filter_pushdown_not_supported, FilterDescription, FilterPushdownResult,
+};
 pub use crate::metrics::Metric;
 pub use crate::ordering::InputOrderMode;
 pub use crate::stream::EmptyRecordBatchStream;
@@ -467,6 +470,41 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
         Ok(None)
     }
+
+    /// Attempts to recursively push given filters from the top of the tree 
into leafs.
+    ///
+    /// This is used for various optimizations, such as:
+    ///
+    /// * Pushing down filters into scans in general to minimize the amount of 
data that needs to be materialzied.
+    /// * Pushing down dynamic filters from operators like TopK and Joins into 
scans.
+    ///
+    /// Generally the further down (closer to leaf nodes) that filters can be 
pushed, the better.
+    ///
+    /// Consider the case of a query such as `SELECT * FROM t WHERE a = 1 AND 
b = 2`.
+    /// With no filter pushdown the scan needs to read and materialize all the 
data from `t` and then filter based on `a` and `b`.
+    /// With filter pushdown into the scan it can first read only `a`, then 
`b` and keep track of
+    /// which rows match the filter.
+    /// Then only for rows that match the filter does it have to materialize 
the rest of the columns.
+    ///
+    /// # Default Implementation
+    ///
+    /// The default implementation assumes:
+    /// * Parent filters can't be passed onto children.
+    /// * This node has no filters to contribute.
+    ///
+    /// # Implementation Notes
+    ///
+    /// Most of the actual logic is implemented as a Physical Optimizer rule.
+    /// See [`PushdownFilter`] for more details.
+    ///
+    /// [`PushdownFilter`]: 
https://docs.rs/datafusion/latest/datafusion/physical_optimizer/filter_pushdown/struct.PushdownFilter.html
+    fn try_pushdown_filters(
+        &self,
+        fd: FilterDescription,
+        _config: &ConfigOptions,
+    ) -> Result<FilterPushdownResult<Arc<dyn ExecutionPlan>>> {
+        Ok(filter_pushdown_not_supported(fd))
+    }
 }
 
 /// [`ExecutionPlan`] Invariant Level
@@ -519,13 +557,15 @@ pub trait ExecutionPlanProperties {
     /// If this ExecutionPlan makes no changes to the schema of the rows 
flowing
     /// through it or how columns within each row relate to each other, it
     /// should return the equivalence properties of its input. For
-    /// example, since `FilterExec` may remove rows from its input, but does 
not
+    /// example, since [`FilterExec`] may remove rows from its input, but does 
not
     /// otherwise modify them, it preserves its input equivalence properties.
     /// However, since `ProjectionExec` may calculate derived expressions, it
     /// needs special handling.
     ///
     /// See also [`ExecutionPlan::maintains_input_order`] and 
[`Self::output_ordering`]
     /// for related concepts.
+    ///
+    /// [`FilterExec`]: crate::filter::FilterExec
     fn equivalence_properties(&self) -> &EquivalenceProperties;
 }
 
diff --git a/datafusion/physical-plan/src/filter.rs 
b/datafusion/physical-plan/src/filter.rs
index a8a9973ea0..95fa67025e 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -26,6 +26,9 @@ use super::{
 };
 use crate::common::can_project;
 use crate::execution_plan::CardinalityEffect;
+use crate::filter_pushdown::{
+    FilterDescription, FilterPushdownResult, FilterPushdownSupport,
+};
 use crate::projection::{
     make_with_child, try_embed_projection, update_expr, EmbeddedProjection,
     ProjectionExec,
@@ -39,6 +42,7 @@ use arrow::compute::filter_record_batch;
 use arrow::datatypes::{DataType, SchemaRef};
 use arrow::record_batch::RecordBatch;
 use datafusion_common::cast::as_boolean_array;
+use datafusion_common::config::ConfigOptions;
 use datafusion_common::stats::Precision;
 use datafusion_common::{
     internal_err, plan_err, project_schema, DataFusionError, Result, 
ScalarValue,
@@ -46,7 +50,7 @@ use datafusion_common::{
 use datafusion_execution::TaskContext;
 use datafusion_expr::Operator;
 use datafusion_physical_expr::equivalence::ProjectionMapping;
-use datafusion_physical_expr::expressions::BinaryExpr;
+use datafusion_physical_expr::expressions::{BinaryExpr, Column};
 use datafusion_physical_expr::intervals::utils::check_support;
 use datafusion_physical_expr::utils::collect_columns;
 use datafusion_physical_expr::{
@@ -433,6 +437,56 @@ impl ExecutionPlan for FilterExec {
         }
         try_embed_projection(projection, self)
     }
+
+    fn try_pushdown_filters(
+        &self,
+        mut fd: FilterDescription,
+        _config: &ConfigOptions,
+    ) -> Result<FilterPushdownResult<Arc<dyn ExecutionPlan>>> {
+        // Extend the filter descriptions
+        fd.filters.push(Arc::clone(&self.predicate));
+
+        // Extract the information
+        let child_descriptions = vec![fd];
+        let remaining_description = FilterDescription { filters: vec![] };
+        let filter_input = Arc::clone(self.input());
+
+        if let Some(projection_indices) = self.projection.as_ref() {
+            // Push the filters down, but leave a ProjectionExec behind, 
instead of the FilterExec
+            let filter_child_schema = filter_input.schema();
+            let proj_exprs = projection_indices
+                .iter()
+                .map(|p| {
+                    let field = filter_child_schema.field(*p).clone();
+                    (
+                        Arc::new(Column::new(field.name(), *p)) as Arc<dyn 
PhysicalExpr>,
+                        field.name().to_string(),
+                    )
+                })
+                .collect::<Vec<_>>();
+            let projection_exec =
+                Arc::new(ProjectionExec::try_new(proj_exprs, filter_input)?) 
as _;
+
+            Ok(FilterPushdownResult {
+                support: FilterPushdownSupport::Supported {
+                    child_descriptions,
+                    op: projection_exec,
+                    revisit: false,
+                },
+                remaining_description,
+            })
+        } else {
+            // Pull out the FilterExec, and inform the rule as it should be 
re-run
+            Ok(FilterPushdownResult {
+                support: FilterPushdownSupport::Supported {
+                    child_descriptions,
+                    op: filter_input,
+                    revisit: true,
+                },
+                remaining_description,
+            })
+        }
+    }
 }
 
 impl EmbeddedProjection for FilterExec {
diff --git a/datafusion/physical-plan/src/filter_pushdown.rs 
b/datafusion/physical-plan/src/filter_pushdown.rs
new file mode 100644
index 0000000000..38f5aef592
--- /dev/null
+++ b/datafusion/physical-plan/src/filter_pushdown.rs
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use crate::ExecutionPlan;
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+
+#[derive(Clone, Debug)]
+pub struct FilterDescription {
+    /// Expressions coming from the parent nodes
+    pub filters: Vec<Arc<dyn PhysicalExpr>>,
+}
+
+impl Default for FilterDescription {
+    fn default() -> Self {
+        Self::empty()
+    }
+}
+
+impl FilterDescription {
+    /// Takes the filters out of the struct, leaving an empty vector in its 
place.
+    pub fn take_description(&mut self) -> Vec<Arc<dyn PhysicalExpr>> {
+        std::mem::take(&mut self.filters)
+    }
+
+    pub fn empty() -> FilterDescription {
+        Self { filters: vec![] }
+    }
+}
+
+#[derive(Debug)]
+pub enum FilterPushdownSupport<T> {
+    Supported {
+        // Filter predicates which can be pushed down through the operator.
+        // NOTE that these are not placed into any operator.
+        child_descriptions: Vec<FilterDescription>,
+        // Possibly updated new operator
+        op: T,
+        // Whether the node is removed from the plan and the rule should be 
re-run manually
+        // on the new node.
+        // TODO: If TreeNodeRecursion supports Revisit mechanism, this flag 
can be removed
+        revisit: bool,
+    },
+    NotSupported,
+}
+
+#[derive(Debug)]
+pub struct FilterPushdownResult<T> {
+    pub support: FilterPushdownSupport<T>,
+    // Filters which cannot be pushed down through the operator.
+    // NOTE that caller of try_pushdown_filters() should handle these 
remanining predicates,
+    // possibly introducing a FilterExec on top of this operator.
+    pub remaining_description: FilterDescription,
+}
+
+pub fn filter_pushdown_not_supported<T>(
+    remaining_description: FilterDescription,
+) -> FilterPushdownResult<T> {
+    FilterPushdownResult {
+        support: FilterPushdownSupport::NotSupported,
+        remaining_description,
+    }
+}
+
+pub fn filter_pushdown_transparent<T>(
+    plan: Arc<dyn ExecutionPlan>,
+    fd: FilterDescription,
+) -> FilterPushdownResult<Arc<dyn ExecutionPlan>> {
+    let child_descriptions = vec![fd];
+    let remaining_description = FilterDescription::empty();
+
+    FilterPushdownResult {
+        support: FilterPushdownSupport::Supported {
+            child_descriptions,
+            op: plan,
+            revisit: false,
+        },
+        remaining_description,
+    }
+}
diff --git a/datafusion/physical-plan/src/lib.rs 
b/datafusion/physical-plan/src/lib.rs
index b256e615b2..a1862554b3 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -67,6 +67,7 @@ pub mod empty;
 pub mod execution_plan;
 pub mod explain;
 pub mod filter;
+pub mod filter_pushdown;
 pub mod joins;
 pub mod limit;
 pub mod memory;
diff --git a/datafusion/physical-plan/src/repartition/mod.rs 
b/datafusion/physical-plan/src/repartition/mod.rs
index 71479ffa96..c480fc2aba 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -43,6 +43,7 @@ use crate::{DisplayFormatType, ExecutionPlan, Partitioning, 
PlanProperties, Stat
 use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions};
 use arrow::compute::take_arrays;
 use arrow::datatypes::{SchemaRef, UInt32Type};
+use datafusion_common::config::ConfigOptions;
 use datafusion_common::utils::transpose;
 use datafusion_common::HashMap;
 use datafusion_common::{not_impl_err, DataFusionError, Result};
@@ -52,6 +53,9 @@ use datafusion_execution::TaskContext;
 use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr};
 use datafusion_physical_expr_common::sort_expr::LexOrdering;
 
+use crate::filter_pushdown::{
+    filter_pushdown_transparent, FilterDescription, FilterPushdownResult,
+};
 use futures::stream::Stream;
 use futures::{FutureExt, StreamExt, TryStreamExt};
 use log::trace;
@@ -730,6 +734,17 @@ impl ExecutionPlan for RepartitionExec {
             new_partitioning,
         )?)))
     }
+
+    fn try_pushdown_filters(
+        &self,
+        fd: FilterDescription,
+        _config: &ConfigOptions,
+    ) -> Result<FilterPushdownResult<Arc<dyn ExecutionPlan>>> {
+        Ok(filter_pushdown_transparent::<Arc<dyn ExecutionPlan>>(
+            Arc::new(self.clone()),
+            fd,
+        ))
+    }
 }
 
 impl RepartitionExec {
diff --git a/datafusion/sqllogictest/test_files/aggregate.slt 
b/datafusion/sqllogictest/test_files/aggregate.slt
index a55ac079aa..35bbc75814 100644
--- a/datafusion/sqllogictest/test_files/aggregate.slt
+++ b/datafusion/sqllogictest/test_files/aggregate.slt
@@ -6795,4 +6795,3 @@ select c2, count(*) from test WHERE 1 = 1 group by c2;
 4 1
 5 1
 6 1
-
diff --git a/datafusion/sqllogictest/test_files/array.slt 
b/datafusion/sqllogictest/test_files/array.slt
index f165d3bf66..e780d6c8b2 100644
--- a/datafusion/sqllogictest/test_files/array.slt
+++ b/datafusion/sqllogictest/test_files/array.slt
@@ -5992,7 +5992,7 @@ logical_plan
 02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
 03)----SubqueryAlias: test
 04)------SubqueryAlias: t
-05)--------Projection:
+05)--------Projection: 
 06)----------Filter: substr(CAST(md5(CAST(tmp_table.value AS Utf8)) AS Utf8), 
Int64(1), Int64(32)) IN ([Utf8View("7f4b18de3cfeb9b4ac78c381ee2ad278"), 
Utf8View("a"), Utf8View("b"), Utf8View("c")])
 07)------------TableScan: tmp_table projection=[value]
 physical_plan
@@ -6021,7 +6021,7 @@ logical_plan
 02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
 03)----SubqueryAlias: test
 04)------SubqueryAlias: t
-05)--------Projection:
+05)--------Projection: 
 06)----------Filter: substr(CAST(md5(CAST(tmp_table.value AS Utf8)) AS Utf8), 
Int64(1), Int64(32)) IN ([Utf8View("7f4b18de3cfeb9b4ac78c381ee2ad278"), 
Utf8View("a"), Utf8View("b"), Utf8View("c")])
 07)------------TableScan: tmp_table projection=[value]
 physical_plan
@@ -6050,7 +6050,7 @@ logical_plan
 02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
 03)----SubqueryAlias: test
 04)------SubqueryAlias: t
-05)--------Projection:
+05)--------Projection: 
 06)----------Filter: substr(CAST(md5(CAST(tmp_table.value AS Utf8)) AS Utf8), 
Int64(1), Int64(32)) IN ([Utf8View("7f4b18de3cfeb9b4ac78c381ee2ad278"), 
Utf8View("a"), Utf8View("b"), Utf8View("c")])
 07)------------TableScan: tmp_table projection=[value]
 physical_plan
@@ -6081,7 +6081,7 @@ logical_plan
 02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
 03)----SubqueryAlias: test
 04)------SubqueryAlias: t
-05)--------Projection:
+05)--------Projection: 
 06)----------Filter: array_has(LargeList([7f4b18de3cfeb9b4ac78c381ee2ad278, a, 
b, c]), substr(CAST(md5(CAST(tmp_table.value AS Utf8)) AS Utf8), Int64(1), 
Int64(32)))
 07)------------TableScan: tmp_table projection=[value]
 physical_plan
@@ -6110,7 +6110,7 @@ logical_plan
 02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
 03)----SubqueryAlias: test
 04)------SubqueryAlias: t
-05)--------Projection:
+05)--------Projection: 
 06)----------Filter: substr(CAST(md5(CAST(tmp_table.value AS Utf8)) AS Utf8), 
Int64(1), Int64(32)) IN ([Utf8View("7f4b18de3cfeb9b4ac78c381ee2ad278"), 
Utf8View("a"), Utf8View("b"), Utf8View("c")])
 07)------------TableScan: tmp_table projection=[value]
 physical_plan
@@ -6130,7 +6130,7 @@ select count(*) from test WHERE array_has([needle], 
needle);
 ----
 100000
 
-# The optimizer does not currently eliminate the filter; 
+# The optimizer does not currently eliminate the filter;
 # Instead, it's rewritten as `IS NULL OR NOT NULL` due to SQL null semantics
 query TT
 explain with test AS (SELECT substr(md5(i::text)::text, 1, 32) as needle FROM 
generate_series(1, 100000) t(i))
diff --git a/datafusion/sqllogictest/test_files/cte.slt 
b/datafusion/sqllogictest/test_files/cte.slt
index e019af9775..32320a06f4 100644
--- a/datafusion/sqllogictest/test_files/cte.slt
+++ b/datafusion/sqllogictest/test_files/cte.slt
@@ -722,7 +722,7 @@ logical_plan
 03)----Projection: Int64(1) AS val
 04)------EmptyRelation
 05)----Projection: Int64(2) AS val
-06)------Cross Join:
+06)------Cross Join: 
 07)--------Filter: recursive_cte.val < Int64(2)
 08)----------TableScan: recursive_cte
 09)--------SubqueryAlias: sub_cte
diff --git a/datafusion/sqllogictest/test_files/dictionary.slt 
b/datafusion/sqllogictest/test_files/dictionary.slt
index 1769f42c2d..d241e61f33 100644
--- a/datafusion/sqllogictest/test_files/dictionary.slt
+++ b/datafusion/sqllogictest/test_files/dictionary.slt
@@ -456,4 +456,4 @@ statement ok
 CREATE TABLE test0 AS VALUES ('foo',1), ('bar',2), ('foo',3);
 
 statement ok
-COPY (SELECT arrow_cast(column1, 'Dictionary(Int32, Utf8)') AS column1, 
column2 FROM test0) TO 'test_files/scratch/copy/part_dict_test' STORED AS 
PARQUET PARTITIONED BY (column1);
\ No newline at end of file
+COPY (SELECT arrow_cast(column1, 'Dictionary(Int32, Utf8)') AS column1, 
column2 FROM test0) TO 'test_files/scratch/copy/part_dict_test' STORED AS 
PARQUET PARTITIONED BY (column1);
diff --git a/datafusion/sqllogictest/test_files/explain.slt 
b/datafusion/sqllogictest/test_files/explain.slt
index deff793e51..ba2596551f 100644
--- a/datafusion/sqllogictest/test_files/explain.slt
+++ b/datafusion/sqllogictest/test_files/explain.slt
@@ -237,6 +237,7 @@ physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
 physical_plan after coalesce_batches SAME TEXT AS ABOVE
 physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, 
c], file_type=csv, has_header=true
 physical_plan after LimitAggregation SAME TEXT AS ABOVE
+physical_plan after PushdownFilter SAME TEXT AS ABOVE
 physical_plan after LimitPushdown SAME TEXT AS ABOVE
 physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
 physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
@@ -313,6 +314,7 @@ physical_plan after OutputRequirements
 01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), 
Bytes=Exact(671), 
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
 02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), 
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
 physical_plan after LimitAggregation SAME TEXT AS ABOVE
+physical_plan after PushdownFilter SAME TEXT AS ABOVE
 physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), 
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
 physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
 physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
@@ -353,6 +355,7 @@ physical_plan after OutputRequirements
 01)GlobalLimitExec: skip=0, fetch=10
 02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
file_type=parquet
 physical_plan after LimitAggregation SAME TEXT AS ABOVE
+physical_plan after PushdownFilter SAME TEXT AS ABOVE
 physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
file_type=parquet
 physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
 physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
diff --git a/datafusion/sqllogictest/test_files/simplify_expr.slt 
b/datafusion/sqllogictest/test_files/simplify_expr.slt
index 9985ab49c2..075ccafcfd 100644
--- a/datafusion/sqllogictest/test_files/simplify_expr.slt
+++ b/datafusion/sqllogictest/test_files/simplify_expr.slt
@@ -107,4 +107,3 @@ query B
 SELECT a / NULL::DECIMAL(4,3) > 1.2::decimal(2,1) FROM VALUES (1) AS t(a);
 ----
 NULL
-


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to