This is an automated email from the ASF dual-hosted git repository.
berkay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 112cde8e3b ExecutionPlan: add APIs for filter pushdown & optimizer
rule to apply them (#15566)
112cde8e3b is described below
commit 112cde8e3bedc0e1a56795decf14f08bebf14669
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Thu Apr 17 10:42:41 2025 -0500
ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them
(#15566)
* ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them
* wip
* fix tests
* fix
* fix
* fix doc
* fix doc
* Improve doc comments of `filter-pushdown-apis` (#22)
* Improve doc comments
* Apply suggestions from code review
---------
Co-authored-by: Adrian Garcia Badaracco
<[email protected]>
* Apply suggestions from code review
Co-authored-by: Andrew Lamb <[email protected]>
* simplify according to pr feedback
* Add missing file
* Add tests
* pipe config in
* docstrings
* Update datafusion/physical-plan/src/filter_pushdown.rs
* fix
* fix
* fmt
* fix doc
* add example usage of config
* fix test
* convert exec API and optimizer rule
* re-add docs
* dbg
* dbg 2
* avoid clones
* part 3
* fix lint
* tests pass
* Update filter.rs
* update projection tests
* update slt files
* fix
* fix references
* improve impls and update tests
* apply stop logic
* update slt's
* update other tests
* minor
* rename modules to match logical optimizer, tweak docs
---------
Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: berkaysynnada <[email protected]>
Co-authored-by: Berkay Şahin
<[email protected]>
---
datafusion/core/tests/physical_optimizer/mod.rs | 1 +
.../tests/physical_optimizer/push_down_filter.rs | 542 +++++++++++++++++++++
datafusion/datasource/src/file.rs | 22 +-
datafusion/datasource/src/file_scan_config.rs | 69 ++-
datafusion/datasource/src/source.rs | 79 ++-
datafusion/physical-optimizer/src/lib.rs | 1 +
datafusion/physical-optimizer/src/optimizer.rs | 5 +
.../physical-optimizer/src/push_down_filter.rs | 535 ++++++++++++++++++++
datafusion/physical-plan/src/coalesce_batches.rs | 15 +
datafusion/physical-plan/src/execution_plan.rs | 42 +-
datafusion/physical-plan/src/filter.rs | 56 ++-
datafusion/physical-plan/src/filter_pushdown.rs | 95 ++++
datafusion/physical-plan/src/lib.rs | 1 +
datafusion/physical-plan/src/repartition/mod.rs | 15 +
datafusion/sqllogictest/test_files/aggregate.slt | 1 -
datafusion/sqllogictest/test_files/array.slt | 12 +-
datafusion/sqllogictest/test_files/cte.slt | 2 +-
datafusion/sqllogictest/test_files/dictionary.slt | 2 +-
datafusion/sqllogictest/test_files/explain.slt | 3 +
.../sqllogictest/test_files/simplify_expr.slt | 1 -
20 files changed, 1462 insertions(+), 37 deletions(-)
diff --git a/datafusion/core/tests/physical_optimizer/mod.rs
b/datafusion/core/tests/physical_optimizer/mod.rs
index 7d5d07715e..6643e7fd59 100644
--- a/datafusion/core/tests/physical_optimizer/mod.rs
+++ b/datafusion/core/tests/physical_optimizer/mod.rs
@@ -25,6 +25,7 @@ mod join_selection;
mod limit_pushdown;
mod limited_distinct_aggregation;
mod projection_pushdown;
+mod push_down_filter;
mod replace_with_order_preserving_variants;
mod sanity_checker;
mod test_utils;
diff --git a/datafusion/core/tests/physical_optimizer/push_down_filter.rs
b/datafusion/core/tests/physical_optimizer/push_down_filter.rs
new file mode 100644
index 0000000000..b19144f1bc
--- /dev/null
+++ b/datafusion/core/tests/physical_optimizer/push_down_filter.rs
@@ -0,0 +1,542 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::{Arc, OnceLock};
+use std::{
+ any::Any,
+ fmt::{Display, Formatter},
+};
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion::{
+ datasource::object_store::ObjectStoreUrl,
+ logical_expr::Operator,
+ physical_plan::{
+ expressions::{BinaryExpr, Column, Literal},
+ PhysicalExpr,
+ },
+ scalar::ScalarValue,
+};
+use datafusion_common::{config::ConfigOptions, Statistics};
+use datafusion_common::{internal_err, Result};
+use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
+use datafusion_datasource::source::DataSourceExec;
+use datafusion_datasource::{
+ file::FileSource, file_scan_config::FileScanConfig,
file_stream::FileOpener,
+};
+use datafusion_expr::test::function_stub::count_udaf;
+use datafusion_physical_expr::expressions::col;
+use datafusion_physical_expr::{
+ aggregate::AggregateExprBuilder, conjunction, Partitioning,
+};
+use datafusion_physical_expr_common::physical_expr::fmt_sql;
+use datafusion_physical_optimizer::push_down_filter::PushdownFilter;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::filter_pushdown::{
+ filter_pushdown_not_supported, FilterDescription, FilterPushdownResult,
+ FilterPushdownSupport,
+};
+use datafusion_physical_plan::{
+ aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
+ coalesce_batches::CoalesceBatchesExec,
+ filter::FilterExec,
+ repartition::RepartitionExec,
+};
+use datafusion_physical_plan::{
+ displayable, metrics::ExecutionPlanMetricsSet, DisplayFormatType,
ExecutionPlan,
+};
+
+use object_store::ObjectStore;
+
+/// A placeholder data source that accepts filter pushdown
+#[derive(Clone, Default)]
+struct TestSource {
+ support: bool,
+ predicate: Option<Arc<dyn PhysicalExpr>>,
+ statistics: Option<Statistics>,
+}
+
+impl TestSource {
+ fn new(support: bool) -> Self {
+ Self {
+ support,
+ predicate: None,
+ statistics: None,
+ }
+ }
+}
+
+impl FileSource for TestSource {
+ fn create_file_opener(
+ &self,
+ _object_store: Arc<dyn ObjectStore>,
+ _base_config: &FileScanConfig,
+ _partition: usize,
+ ) -> Arc<dyn FileOpener> {
+ todo!("should not be called")
+ }
+
+ fn as_any(&self) -> &dyn Any {
+ todo!("should not be called")
+ }
+
+ fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
+ todo!("should not be called")
+ }
+
+ fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
+ todo!("should not be called")
+ }
+
+ fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource>
{
+ todo!("should not be called")
+ }
+
+ fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
+ Arc::new(TestSource {
+ statistics: Some(statistics),
+ ..self.clone()
+ })
+ }
+
+ fn metrics(&self) -> &ExecutionPlanMetricsSet {
+ todo!("should not be called")
+ }
+
+ fn statistics(&self) -> Result<Statistics> {
+ Ok(self
+ .statistics
+ .as_ref()
+ .expect("statistics not set")
+ .clone())
+ }
+
+ fn file_type(&self) -> &str {
+ "test"
+ }
+
+ fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) ->
std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ let support = format!(", pushdown_supported={}", self.support);
+
+ let predicate_string = self
+ .predicate
+ .as_ref()
+ .map(|p| format!(", predicate={p}"))
+ .unwrap_or_default();
+
+ write!(f, "{}{}", support, predicate_string)
+ }
+ DisplayFormatType::TreeRender => {
+ if let Some(predicate) = &self.predicate {
+ writeln!(f, "pushdown_supported={}",
fmt_sql(predicate.as_ref()))?;
+ writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
+ }
+ Ok(())
+ }
+ }
+ }
+
+ fn try_pushdown_filters(
+ &self,
+ mut fd: FilterDescription,
+ config: &ConfigOptions,
+ ) -> Result<FilterPushdownResult<Arc<dyn FileSource>>> {
+ if self.support && config.execution.parquet.pushdown_filters {
+ if let Some(internal) = self.predicate.as_ref() {
+ fd.filters.push(Arc::clone(internal));
+ }
+ let all_filters = fd.take_description();
+
+ Ok(FilterPushdownResult {
+ support: FilterPushdownSupport::Supported {
+ child_descriptions: vec![],
+ op: Arc::new(TestSource {
+ support: true,
+ predicate: Some(conjunction(all_filters)),
+ statistics: self.statistics.clone(), // should be
updated in reality
+ }),
+ revisit: false,
+ },
+ remaining_description: FilterDescription::empty(),
+ })
+ } else {
+ Ok(filter_pushdown_not_supported(fd))
+ }
+ }
+}
+
+fn test_scan(support: bool) -> Arc<dyn ExecutionPlan> {
+ let schema = schema();
+ let source = Arc::new(TestSource::new(support));
+ let base_config = FileScanConfigBuilder::new(
+ ObjectStoreUrl::parse("test://").unwrap(),
+ Arc::clone(schema),
+ source,
+ )
+ .build();
+ DataSourceExec::from_data_source(base_config)
+}
+
+#[test]
+fn test_pushdown_into_scan() {
+ let scan = test_scan(true);
+ let predicate = col_lit_predicate("a", "foo", schema());
+ let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap());
+
+ // expect the predicate to be pushed down into the DataSource
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownFilter{}, true),
+ @r"
+ OptimizationTest:
+ input:
+ - FilterExec: a@0 = foo
+ - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c],
file_type=test, pushdown_supported=true
+ output:
+ Ok:
+ - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c],
file_type=test, pushdown_supported=true, predicate=a@0 = foo
+ "
+ );
+}
+
+/// Show that we can use config options to determine how to do pushdown.
+#[test]
+fn test_pushdown_into_scan_with_config_options() {
+ let scan = test_scan(true);
+ let predicate = col_lit_predicate("a", "foo", schema());
+ let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap()) as _;
+
+ let mut cfg = ConfigOptions::default();
+ insta::assert_snapshot!(
+ OptimizationTest::new(
+ Arc::clone(&plan),
+ PushdownFilter {},
+ false
+ ),
+ @r"
+ OptimizationTest:
+ input:
+ - FilterExec: a@0 = foo
+ - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c],
file_type=test, pushdown_supported=true
+ output:
+ Ok:
+ - FilterExec: a@0 = foo
+ - DataSourceExec: file_groups={0 groups: []}, projection=[a, b,
c], file_type=test, pushdown_supported=true
+ "
+ );
+
+ cfg.execution.parquet.pushdown_filters = true;
+ insta::assert_snapshot!(
+ OptimizationTest::new(
+ plan,
+ PushdownFilter {},
+ true
+ ),
+ @r"
+ OptimizationTest:
+ input:
+ - FilterExec: a@0 = foo
+ - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c],
file_type=test, pushdown_supported=true
+ output:
+ Ok:
+ - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c],
file_type=test, pushdown_supported=true, predicate=a@0 = foo
+ "
+ );
+}
+
+#[test]
+fn test_filter_collapse() {
+ // filter should be pushed down into the parquet scan with two filters
+ let scan = test_scan(true);
+ let predicate1 = col_lit_predicate("a", "foo", schema());
+ let filter1 = Arc::new(FilterExec::try_new(predicate1, scan).unwrap());
+ let predicate2 = col_lit_predicate("b", "bar", schema());
+ let plan = Arc::new(FilterExec::try_new(predicate2, filter1).unwrap());
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownFilter{}, true),
+ @r"
+ OptimizationTest:
+ input:
+ - FilterExec: b@1 = bar
+ - FilterExec: a@0 = foo
+ - DataSourceExec: file_groups={0 groups: []}, projection=[a, b,
c], file_type=test, pushdown_supported=true
+ output:
+ Ok:
+ - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c],
file_type=test, pushdown_supported=true, predicate=b@1 = bar AND a@0 = foo
+ "
+ );
+}
+
+#[test]
+fn test_filter_with_projection() {
+ let scan = test_scan(true);
+ let projection = vec![1, 0];
+ let predicate = col_lit_predicate("a", "foo", schema());
+ let plan = Arc::new(
+ FilterExec::try_new(predicate, Arc::clone(&scan))
+ .unwrap()
+ .with_projection(Some(projection))
+ .unwrap(),
+ );
+
+ // expect the predicate to be pushed down into the DataSource but the
FilterExec to be converted to ProjectionExec
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownFilter{}, true),
+ @r"
+ OptimizationTest:
+ input:
+ - FilterExec: a@0 = foo, projection=[b@1, a@0]
+ - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c],
file_type=test, pushdown_supported=true
+ output:
+ Ok:
+ - ProjectionExec: expr=[b@1 as b, a@0 as a]
+ - DataSourceExec: file_groups={0 groups: []}, projection=[a, b,
c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
+ ",
+ );
+
+ // add a test where the filter is on a column that isn't included in the
output
+ let projection = vec![1];
+ let predicate = col_lit_predicate("a", "foo", schema());
+ let plan = Arc::new(
+ FilterExec::try_new(predicate, scan)
+ .unwrap()
+ .with_projection(Some(projection))
+ .unwrap(),
+ );
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownFilter{},true),
+ @r"
+ OptimizationTest:
+ input:
+ - FilterExec: a@0 = foo, projection=[b@1]
+ - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c],
file_type=test, pushdown_supported=true
+ output:
+ Ok:
+ - ProjectionExec: expr=[b@1 as b]
+ - DataSourceExec: file_groups={0 groups: []}, projection=[a, b,
c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
+ "
+ );
+}
+
+#[test]
+fn test_push_down_through_transparent_nodes() {
+ // expect the predicate to be pushed down into the DataSource
+ let scan = test_scan(true);
+ let coalesce = Arc::new(CoalesceBatchesExec::new(scan, 1));
+ let predicate = col_lit_predicate("a", "foo", schema());
+ let filter = Arc::new(FilterExec::try_new(predicate, coalesce).unwrap());
+ let repartition = Arc::new(
+ RepartitionExec::try_new(filter,
Partitioning::RoundRobinBatch(1)).unwrap(),
+ );
+ let predicate = col_lit_predicate("b", "bar", schema());
+ let plan = Arc::new(FilterExec::try_new(predicate, repartition).unwrap());
+
+ // expect the predicate to be pushed down into the DataSource
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownFilter{},true),
+ @r"
+ OptimizationTest:
+ input:
+ - FilterExec: b@1 = bar
+ - RepartitionExec: partitioning=RoundRobinBatch(1),
input_partitions=0
+ - FilterExec: a@0 = foo
+ - CoalesceBatchesExec: target_batch_size=1
+ - DataSourceExec: file_groups={0 groups: []}, projection=[a,
b, c], file_type=test, pushdown_supported=true
+ output:
+ Ok:
+ - RepartitionExec: partitioning=RoundRobinBatch(1),
input_partitions=0
+ - CoalesceBatchesExec: target_batch_size=1
+ - DataSourceExec: file_groups={0 groups: []}, projection=[a, b,
c], file_type=test, pushdown_supported=true, predicate=b@1 = bar AND a@0 = foo
+ "
+ );
+}
+
+#[test]
+fn test_no_pushdown_through_aggregates() {
+ // There are 2 important points here:
+ // 1. The outer filter **is not** pushed down at all because we haven't
implemented pushdown support
+ // yet for AggregateExec.
+ // 2. The inner filter **is** pushed down into the DataSource.
+ let scan = test_scan(true);
+
+ let coalesce = Arc::new(CoalesceBatchesExec::new(scan, 10));
+
+ let filter = Arc::new(
+ FilterExec::try_new(col_lit_predicate("a", "foo", schema()),
coalesce).unwrap(),
+ );
+
+ let aggregate_expr =
+ vec![
+ AggregateExprBuilder::new(count_udaf(), vec![col("a",
schema()).unwrap()])
+ .schema(Arc::clone(schema()))
+ .alias("cnt")
+ .build()
+ .map(Arc::new)
+ .unwrap(),
+ ];
+ let group_by = PhysicalGroupBy::new_single(vec![
+ (col("a", schema()).unwrap(), "a".to_string()),
+ (col("b", schema()).unwrap(), "b".to_string()),
+ ]);
+ let aggregate = Arc::new(
+ AggregateExec::try_new(
+ AggregateMode::Final,
+ group_by,
+ aggregate_expr.clone(),
+ vec![None],
+ filter,
+ Arc::clone(schema()),
+ )
+ .unwrap(),
+ );
+
+ let coalesce = Arc::new(CoalesceBatchesExec::new(aggregate, 100));
+
+ let predicate = col_lit_predicate("b", "bar", schema());
+ let plan = Arc::new(FilterExec::try_new(predicate, coalesce).unwrap());
+
+ // expect the predicate to be pushed down into the DataSource
+ insta::assert_snapshot!(
+ OptimizationTest::new(plan, PushdownFilter{}, true),
+ @r"
+ OptimizationTest:
+ input:
+ - FilterExec: b@1 = bar
+ - CoalesceBatchesExec: target_batch_size=100
+ - AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt],
ordering_mode=PartiallySorted([0])
+ - FilterExec: a@0 = foo
+ - CoalesceBatchesExec: target_batch_size=10
+ - DataSourceExec: file_groups={0 groups: []}, projection=[a,
b, c], file_type=test, pushdown_supported=true
+ output:
+ Ok:
+ - FilterExec: b@1 = bar
+ - CoalesceBatchesExec: target_batch_size=100
+ - AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt]
+ - CoalesceBatchesExec: target_batch_size=10
+ - DataSourceExec: file_groups={0 groups: []}, projection=[a,
b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
+ "
+ );
+}
+
+/// Schema:
+/// a: String
+/// b: String
+/// c: f64
+static TEST_SCHEMA: OnceLock<SchemaRef> = OnceLock::new();
+
+fn schema() -> &'static SchemaRef {
+ TEST_SCHEMA.get_or_init(|| {
+ let fields = vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ Field::new("c", DataType::Float64, false),
+ ];
+ Arc::new(Schema::new(fields))
+ })
+}
+
+/// Returns a predicate that is a binary expression col = lit
+fn col_lit_predicate(
+ column_name: &str,
+ scalar_value: impl Into<ScalarValue>,
+ schema: &Schema,
+) -> Arc<dyn PhysicalExpr> {
+ let scalar_value = scalar_value.into();
+ Arc::new(BinaryExpr::new(
+ Arc::new(Column::new_with_schema(column_name, schema).unwrap()),
+ Operator::Eq,
+ Arc::new(Literal::new(scalar_value)),
+ ))
+}
+
+/// A harness for testing physical optimizers.
+///
+/// You can use this to test the output of a physical optimizer rule using
insta snapshots
+#[derive(Debug)]
+pub struct OptimizationTest {
+ input: Vec<String>,
+ output: Result<Vec<String>, String>,
+}
+
+impl OptimizationTest {
+ pub fn new<O>(
+ input_plan: Arc<dyn ExecutionPlan>,
+ opt: O,
+ allow_pushdown_filters: bool,
+ ) -> Self
+ where
+ O: PhysicalOptimizerRule,
+ {
+ let mut parquet_pushdown_config = ConfigOptions::default();
+ parquet_pushdown_config.execution.parquet.pushdown_filters =
+ allow_pushdown_filters;
+
+ let input = format_execution_plan(&input_plan);
+ let input_schema = input_plan.schema();
+
+ let output_result = opt.optimize(input_plan, &parquet_pushdown_config);
+ let output = output_result
+ .and_then(|plan| {
+ if opt.schema_check() && (plan.schema() != input_schema) {
+ internal_err!(
+ "Schema mismatch:\n\nBefore:\n{:?}\n\nAfter:\n{:?}",
+ input_schema,
+ plan.schema()
+ )
+ } else {
+ Ok(plan)
+ }
+ })
+ .map(|plan| format_execution_plan(&plan))
+ .map_err(|e| e.to_string());
+
+ Self { input, output }
+ }
+}
+
+impl Display for OptimizationTest {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ writeln!(f, "OptimizationTest:")?;
+ writeln!(f, " input:")?;
+ for line in &self.input {
+ writeln!(f, " - {line}")?;
+ }
+ writeln!(f, " output:")?;
+ match &self.output {
+ Ok(output) => {
+ writeln!(f, " Ok:")?;
+ for line in output {
+ writeln!(f, " - {line}")?;
+ }
+ }
+ Err(err) => {
+ writeln!(f, " Err: {err}")?;
+ }
+ }
+ Ok(())
+ }
+}
+
+pub fn format_execution_plan(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
+ format_lines(&displayable(plan.as_ref()).indent(false).to_string())
+}
+
+fn format_lines(s: &str) -> Vec<String> {
+ s.trim().split('\n').map(|s| s.to_string()).collect()
+}
diff --git a/datafusion/datasource/src/file.rs
b/datafusion/datasource/src/file.rs
index 0066f39801..835285b21e 100644
--- a/datafusion/datasource/src/file.rs
+++ b/datafusion/datasource/src/file.rs
@@ -26,8 +26,12 @@ use crate::file_groups::FileGroupPartitioner;
use crate::file_scan_config::FileScanConfig;
use crate::file_stream::FileOpener;
use arrow::datatypes::SchemaRef;
-use datafusion_common::Statistics;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::{Result, Statistics};
use datafusion_physical_expr::LexOrdering;
+use datafusion_physical_plan::filter_pushdown::{
+ filter_pushdown_not_supported, FilterDescription, FilterPushdownResult,
+};
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::DisplayFormatType;
@@ -57,7 +61,7 @@ pub trait FileSource: Send + Sync {
/// Return execution plan metrics
fn metrics(&self) -> &ExecutionPlanMetricsSet;
/// Return projected statistics
- fn statistics(&self) -> datafusion_common::Result<Statistics>;
+ fn statistics(&self) -> Result<Statistics>;
/// String representation of file source such as "csv", "json", "parquet"
fn file_type(&self) -> &str;
/// Format FileType specific information
@@ -75,7 +79,7 @@ pub trait FileSource: Send + Sync {
repartition_file_min_size: usize,
output_ordering: Option<LexOrdering>,
config: &FileScanConfig,
- ) -> datafusion_common::Result<Option<FileScanConfig>> {
+ ) -> Result<Option<FileScanConfig>> {
if config.file_compression_type.is_compressed() ||
config.new_lines_in_values {
return Ok(None);
}
@@ -93,4 +97,16 @@ pub trait FileSource: Send + Sync {
}
Ok(None)
}
+
+ /// Try to push down filters into this FileSource.
+ /// See [`ExecutionPlan::try_pushdown_filters`] for more details.
+ ///
+ /// [`ExecutionPlan::try_pushdown_filters`]:
datafusion_physical_plan::ExecutionPlan::try_pushdown_filters
+ fn try_pushdown_filters(
+ &self,
+ fd: FilterDescription,
+ _config: &ConfigOptions,
+ ) -> Result<FilterPushdownResult<Arc<dyn FileSource>>> {
+ Ok(filter_pushdown_not_supported(fd))
+ }
}
diff --git a/datafusion/datasource/src/file_scan_config.rs
b/datafusion/datasource/src/file_scan_config.rs
index b20f621474..fb756cc11f 100644
--- a/datafusion/datasource/src/file_scan_config.rs
+++ b/datafusion/datasource/src/file_scan_config.rs
@@ -23,6 +23,16 @@ use std::{
fmt::Result as FmtResult, marker::PhantomData, sync::Arc,
};
+use crate::file_groups::FileGroup;
+use crate::{
+ display::FileGroupsDisplay,
+ file::FileSource,
+ file_compression_type::FileCompressionType,
+ file_stream::FileStream,
+ source::{DataSource, DataSourceExec},
+ statistics::MinMaxStatistics,
+ PartitionedFile,
+};
use arrow::{
array::{
ArrayData, ArrayRef, BufferBuilder, DictionaryArray, RecordBatch,
@@ -31,7 +41,9 @@ use arrow::{
buffer::Buffer,
datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef,
UInt16Type},
};
-use datafusion_common::{exec_err, ColumnStatistics, Constraints, Result,
Statistics};
+use datafusion_common::{
+ config::ConfigOptions, exec_err, ColumnStatistics, Constraints, Result,
Statistics,
+};
use datafusion_common::{DataFusionError, ScalarValue};
use datafusion_execution::{
object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
@@ -40,6 +52,10 @@ use datafusion_physical_expr::{
expressions::Column, EquivalenceProperties, LexOrdering, Partitioning,
PhysicalSortExpr,
};
+use datafusion_physical_plan::filter_pushdown::{
+ filter_pushdown_not_supported, FilterDescription, FilterPushdownResult,
+ FilterPushdownSupport,
+};
use datafusion_physical_plan::{
display::{display_orderings, ProjectSchemaDisplay},
metrics::ExecutionPlanMetricsSet,
@@ -48,17 +64,6 @@ use datafusion_physical_plan::{
};
use log::{debug, warn};
-use crate::file_groups::FileGroup;
-use crate::{
- display::FileGroupsDisplay,
- file::FileSource,
- file_compression_type::FileCompressionType,
- file_stream::FileStream,
- source::{DataSource, DataSourceExec},
- statistics::MinMaxStatistics,
- PartitionedFile,
-};
-
/// The base configurations for a [`DataSourceExec`], the a physical plan for
/// any given file format.
///
@@ -588,6 +593,46 @@ impl DataSource for FileScanConfig {
) as _
}))
}
+
+ fn try_pushdown_filters(
+ &self,
+ fd: FilterDescription,
+ config: &ConfigOptions,
+ ) -> Result<FilterPushdownResult<Arc<dyn DataSource>>> {
+ let FilterPushdownResult {
+ support,
+ remaining_description,
+ } = self.file_source.try_pushdown_filters(fd, config)?;
+
+ match support {
+ FilterPushdownSupport::Supported {
+ child_descriptions,
+ op,
+ revisit,
+ } => {
+ let new_data_source = Arc::new(
+ FileScanConfigBuilder::from(self.clone())
+ .with_source(op)
+ .build(),
+ );
+
+ debug_assert!(child_descriptions.is_empty());
+ debug_assert!(!revisit);
+
+ Ok(FilterPushdownResult {
+ support: FilterPushdownSupport::Supported {
+ child_descriptions,
+ op: new_data_source,
+ revisit,
+ },
+ remaining_description,
+ })
+ }
+ FilterPushdownSupport::NotSupported => {
+ Ok(filter_pushdown_not_supported(remaining_description))
+ }
+ }
+ }
}
impl FileScanConfig {
diff --git a/datafusion/datasource/src/source.rs
b/datafusion/datasource/src/source.rs
index 6c9122ce1a..2d6ea1a8b3 100644
--- a/datafusion/datasource/src/source.rs
+++ b/datafusion/datasource/src/source.rs
@@ -31,10 +31,14 @@ use datafusion_physical_plan::{
use crate::file_scan_config::FileScanConfig;
use datafusion_common::config::ConfigOptions;
-use datafusion_common::{Constraints, Statistics};
+use datafusion_common::{Constraints, Result, Statistics};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use datafusion_physical_plan::filter_pushdown::{
+ filter_pushdown_not_supported, FilterDescription, FilterPushdownResult,
+ FilterPushdownSupport,
+};
/// Common behaviors in Data Sources for both from Files and Memory.
///
@@ -51,7 +55,7 @@ pub trait DataSource: Send + Sync + Debug {
&self,
partition: usize,
context: Arc<TaskContext>,
- ) -> datafusion_common::Result<SendableRecordBatchStream>;
+ ) -> Result<SendableRecordBatchStream>;
fn as_any(&self) -> &dyn Any;
/// Format this source for display in explain plans
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
@@ -62,13 +66,13 @@ pub trait DataSource: Send + Sync + Debug {
_target_partitions: usize,
_repartition_file_min_size: usize,
_output_ordering: Option<LexOrdering>,
- ) -> datafusion_common::Result<Option<Arc<dyn DataSource>>> {
+ ) -> Result<Option<Arc<dyn DataSource>>> {
Ok(None)
}
fn output_partitioning(&self) -> Partitioning;
fn eq_properties(&self) -> EquivalenceProperties;
- fn statistics(&self) -> datafusion_common::Result<Statistics>;
+ fn statistics(&self) -> Result<Statistics>;
/// Return a copy of this DataSource with a new fetch limit
fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
fn fetch(&self) -> Option<usize>;
@@ -78,7 +82,16 @@ pub trait DataSource: Send + Sync + Debug {
fn try_swapping_with_projection(
&self,
_projection: &ProjectionExec,
- ) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>>;
+ ) -> Result<Option<Arc<dyn ExecutionPlan>>>;
+ /// Try to push down filters into this DataSource.
+ /// See [`ExecutionPlan::try_pushdown_filters`] for more details.
+ fn try_pushdown_filters(
+ &self,
+ fd: FilterDescription,
+ _config: &ConfigOptions,
+ ) -> Result<FilterPushdownResult<Arc<dyn DataSource>>> {
+ Ok(filter_pushdown_not_supported(fd))
+ }
}
/// [`ExecutionPlan`] handles different file formats like JSON, CSV, AVRO,
ARROW, PARQUET
@@ -131,7 +144,7 @@ impl ExecutionPlan for DataSourceExec {
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
- ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
+ ) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}
@@ -139,7 +152,7 @@ impl ExecutionPlan for DataSourceExec {
&self,
target_partitions: usize,
config: &ConfigOptions,
- ) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>> {
+ ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let data_source = self.data_source.repartitioned(
target_partitions,
config.optimizer.repartition_file_min_size,
@@ -163,7 +176,7 @@ impl ExecutionPlan for DataSourceExec {
&self,
partition: usize,
context: Arc<TaskContext>,
- ) -> datafusion_common::Result<SendableRecordBatchStream> {
+ ) -> Result<SendableRecordBatchStream> {
self.data_source.open(partition, context)
}
@@ -171,7 +184,7 @@ impl ExecutionPlan for DataSourceExec {
Some(self.data_source.metrics().clone_inner())
}
- fn statistics(&self) -> datafusion_common::Result<Statistics> {
+ fn statistics(&self) -> Result<Statistics> {
self.data_source.statistics()
}
@@ -189,9 +202,45 @@ impl ExecutionPlan for DataSourceExec {
fn try_swapping_with_projection(
&self,
projection: &ProjectionExec,
- ) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>> {
+ ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
self.data_source.try_swapping_with_projection(projection)
}
+
+ fn try_pushdown_filters(
+ &self,
+ fd: FilterDescription,
+ config: &ConfigOptions,
+ ) -> Result<FilterPushdownResult<Arc<dyn ExecutionPlan>>> {
+ let FilterPushdownResult {
+ support,
+ remaining_description,
+ } = self.data_source.try_pushdown_filters(fd, config)?;
+
+ match support {
+ FilterPushdownSupport::Supported {
+ child_descriptions,
+ op,
+ revisit,
+ } => {
+ let new_exec = Arc::new(DataSourceExec::new(op));
+
+ debug_assert!(child_descriptions.is_empty());
+ debug_assert!(!revisit);
+
+ Ok(FilterPushdownResult {
+ support: FilterPushdownSupport::Supported {
+ child_descriptions,
+ op: new_exec,
+ revisit,
+ },
+ remaining_description,
+ })
+ }
+ FilterPushdownSupport::NotSupported => {
+ Ok(filter_pushdown_not_supported(remaining_description))
+ }
+ }
+ }
}
impl DataSourceExec {
@@ -254,3 +303,13 @@ impl DataSourceExec {
})
}
}
+
+/// Create a new `DataSourceExec` from a `DataSource`
+impl<S> From<S> for DataSourceExec
+where
+ S: DataSource + 'static,
+{
+ fn from(source: S) -> Self {
+ Self::new(Arc::new(source))
+ }
+}
diff --git a/datafusion/physical-optimizer/src/lib.rs
b/datafusion/physical-optimizer/src/lib.rs
index 35503f3b0b..57dac21b6e 100644
--- a/datafusion/physical-optimizer/src/lib.rs
+++ b/datafusion/physical-optimizer/src/lib.rs
@@ -36,6 +36,7 @@ pub mod optimizer;
pub mod output_requirements;
pub mod projection_pushdown;
pub mod pruning;
+pub mod push_down_filter;
pub mod sanity_checker;
pub mod topk_aggregation;
pub mod update_aggr_exprs;
diff --git a/datafusion/physical-optimizer/src/optimizer.rs
b/datafusion/physical-optimizer/src/optimizer.rs
index bab31150e2..d4ff7d6b9e 100644
--- a/datafusion/physical-optimizer/src/optimizer.rs
+++ b/datafusion/physical-optimizer/src/optimizer.rs
@@ -30,6 +30,7 @@ use crate::limit_pushdown::LimitPushdown;
use crate::limited_distinct_aggregation::LimitedDistinctAggregation;
use crate::output_requirements::OutputRequirements;
use crate::projection_pushdown::ProjectionPushdown;
+use crate::push_down_filter::PushdownFilter;
use crate::sanity_checker::SanityCheckPlan;
use crate::topk_aggregation::TopKAggregation;
use crate::update_aggr_exprs::OptimizeAggregateOrder;
@@ -121,6 +122,10 @@ impl PhysicalOptimizer {
// into an `order by max(x) limit y`. In this case it will copy
the limit value down
// to the aggregation, allowing it to use only y number of
accumulators.
Arc::new(TopKAggregation::new()),
+ // The FilterPushdown rule tries to push down filters as far as it
can.
+ // For example, it will push down filtering from a `FilterExec` to
+ // a `DataSourceExec`, or from a `TopK`'s current state to a
`DataSourceExec`.
+ Arc::new(PushdownFilter::new()),
// The LimitPushdown rule tries to push limits down as far as
possible,
// replacing operators with fetching variants, or adding limits
// past operators that support limit pushdown.
diff --git a/datafusion/physical-optimizer/src/push_down_filter.rs
b/datafusion/physical-optimizer/src/push_down_filter.rs
new file mode 100644
index 0000000000..80201454d0
--- /dev/null
+++ b/datafusion/physical-optimizer/src/push_down_filter.rs
@@ -0,0 +1,535 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use crate::PhysicalOptimizerRule;
+
+use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
+use datafusion_common::{config::ConfigOptions, Result};
+use datafusion_physical_expr::conjunction;
+use datafusion_physical_plan::filter::FilterExec;
+use datafusion_physical_plan::filter_pushdown::{
+ FilterDescription, FilterPushdownResult, FilterPushdownSupport,
+};
+use datafusion_physical_plan::tree_node::PlanContext;
+use datafusion_physical_plan::ExecutionPlan;
+
+/// Attempts to recursively push given filters from the top of the tree into
leafs.
+///
+/// # Default Implementation
+///
+/// The default implementation in [`ExecutionPlan::try_pushdown_filters`] is a
no-op
+/// that assumes that:
+///
+/// * Parent filters can't be passed onto children.
+/// * This node has no filters to contribute.
+///
+/// # Example: Push filter into a `DataSourceExec`
+///
+/// For example, consider the following plan:
+///
+/// ```text
+/// ┌──────────────────────┐
+/// │ CoalesceBatchesExec │
+/// └──────────────────────┘
+/// │
+/// ▼
+/// ┌──────────────────────┐
+/// │ FilterExec │
+/// │ filters = [ id=1] │
+/// └──────────────────────┘
+/// │
+/// ▼
+/// ┌──────────────────────┐
+/// │ DataSourceExec │
+/// │ projection = * │
+/// └──────────────────────┘
+/// ```
+///
+/// Our goal is to move the `id = 1` filter from the [`FilterExec`] node to
the `DataSourceExec` node.
+///
+/// If this filter is selective pushing it into the scan can avoid massive
+/// amounts of data being read from the source (the projection is `*` so all
+/// matching columns are read).
+///
+/// The new plan looks like:
+///
+/// ```text
+/// ┌──────────────────────┐
+/// │ CoalesceBatchesExec │
+/// └──────────────────────┘
+/// │
+/// ▼
+/// ┌──────────────────────┐
+/// │ DataSourceExec │
+/// │ projection = * │
+/// │ filters = [ id=1] │
+/// └──────────────────────┘
+/// ```
+///
+/// # Example: Push filters with `ProjectionExec`
+///
+/// Let's consider a more complex example involving a [`ProjectionExec`]
+/// node in between the [`FilterExec`] and `DataSourceExec` nodes that
+/// creates a new column that the filter depends on.
+///
+/// ```text
+/// ┌──────────────────────┐
+/// │ CoalesceBatchesExec │
+/// └──────────────────────┘
+/// │
+/// ▼
+/// ┌──────────────────────┐
+/// │ FilterExec │
+/// │ filters = │
+/// │ [cost>50,id=1] │
+/// └──────────────────────┘
+/// │
+/// ▼
+/// ┌──────────────────────┐
+/// │ ProjectionExec │
+/// │ cost = price * 1.2 │
+/// └──────────────────────┘
+/// │
+/// ▼
+/// ┌──────────────────────┐
+/// │ DataSourceExec │
+/// │ projection = * │
+/// └──────────────────────┘
+/// ```
+///
+/// We want to push down the filters `[id=1]` to the `DataSourceExec` node,
+/// but can't push down `cost>50` because it requires the [`ProjectionExec`]
+/// node to be executed first. A simple thing to do would be to split up the
+/// filter into two separate filters and push down the first one:
+///
+/// ```text
+/// ┌──────────────────────┐
+/// │ CoalesceBatchesExec │
+/// └──────────────────────┘
+/// │
+/// ▼
+/// ┌──────────────────────┐
+/// │ FilterExec │
+/// │ filters = │
+/// │ [cost>50] │
+/// └──────────────────────┘
+/// │
+/// ▼
+/// ┌──────────────────────┐
+/// │ ProjectionExec │
+/// │ cost = price * 1.2 │
+/// └──────────────────────┘
+/// │
+/// ▼
+/// ┌──────────────────────┐
+/// │ DataSourceExec │
+/// │ projection = * │
+/// │ filters = [ id=1] │
+/// └──────────────────────┘
+/// ```
+///
+/// We can actually however do better by pushing down `price * 1.2 > 50`
+/// instead of `cost > 50`:
+///
+/// ```text
+/// ┌──────────────────────┐
+/// │ CoalesceBatchesExec │
+/// └──────────────────────┘
+/// │
+/// ▼
+/// ┌──────────────────────┐
+/// │ ProjectionExec │
+/// │ cost = price * 1.2 │
+/// └──────────────────────┘
+/// │
+/// ▼
+/// ┌──────────────────────┐
+/// │ DataSourceExec │
+/// │ projection = * │
+/// │ filters = [id=1, │
+/// │ price * 1.2 > 50] │
+/// └──────────────────────┘
+/// ```
+///
+/// # Example: Push filters within a subtree
+///
+/// There are also cases where we may be able to push down filters within a
+/// subtree but not the entire tree. A good example of this is aggregation
+/// nodes:
+///
+/// ```text
+/// ┌──────────────────────┐
+/// │ ProjectionExec │
+/// │ projection = * │
+/// └──────────────────────┘
+/// │
+/// ▼
+/// ┌──────────────────────┐
+/// │ FilterExec │
+/// │ filters = [sum > 10] │
+/// └──────────────────────┘
+/// │
+/// ▼
+/// ┌───────────────────────┐
+/// │ AggregateExec │
+/// │ group by = [id] │
+/// │ aggregate = │
+/// │ [sum(price)] │
+/// └───────────────────────┘
+/// │
+/// ▼
+/// ┌──────────────────────┐
+/// │ FilterExec │
+/// │ filters = [id=1] │
+/// └──────────────────────┘
+/// │
+/// ▼
+/// ┌──────────────────────┐
+/// │ DataSourceExec │
+/// │ projection = * │
+/// └──────────────────────┘
+/// ```
+///
+/// The transformation here is to push down the `id=1` filter to the
+/// `DataSourceExec` node:
+///
+/// ```text
+/// ┌──────────────────────┐
+/// │ ProjectionExec │
+/// │ projection = * │
+/// └──────────────────────┘
+/// │
+/// ▼
+/// ┌──────────────────────┐
+/// │ FilterExec │
+/// │ filters = [sum > 10] │
+/// └──────────────────────┘
+/// │
+/// ▼
+/// ┌───────────────────────┐
+/// │ AggregateExec │
+/// │ group by = [id] │
+/// │ aggregate = │
+/// │ [sum(price)] │
+/// └───────────────────────┘
+/// │
+/// ▼
+/// ┌──────────────────────┐
+/// │ DataSourceExec │
+/// │ projection = * │
+/// │ filters = [id=1] │
+/// └──────────────────────┘
+/// ```
+///
+/// The point here is that:
+/// 1. We cannot push down `sum > 10` through the [`AggregateExec`] node into
the `DataSourceExec` node.
+/// Any filters above the [`AggregateExec`] node are not pushed down.
+/// This is determined by calling [`ExecutionPlan::try_pushdown_filters`]
on the [`AggregateExec`] node.
+/// 2. We need to keep recursing into the tree so that we can discover the
other [`FilterExec`] node and push
+/// down the `id=1` filter.
+///
+/// # Example: Push filters through Joins
+///
+/// It is also possible to push down filters through joins and filters that
+/// originate from joins. For example, a hash join where we build a hash
+/// table of the left side and probe the right side (ignoring why we would
+/// choose this order, typically it depends on the size of each table,
+/// etc.).
+///
+/// ```text
+/// ┌─────────────────────┐
+/// │ FilterExec │
+/// │ filters = │
+/// │ [d.size > 100] │
+/// └─────────────────────┘
+/// │
+/// │
+/// ┌──────────▼──────────┐
+/// │ │
+/// │ HashJoinExec │
+/// │ [u.dept@hash(d.id)] │
+/// │ │
+/// └─────────────────────┘
+/// │
+/// ┌────────────┴────────────┐
+/// ┌──────────▼──────────┐ ┌──────────▼──────────┐
+/// │ DataSourceExec │ │ DataSourceExec │
+/// │ alias [users as u] │ │ alias [dept as d] │
+/// │ │ │ │
+/// └─────────────────────┘ └─────────────────────┘
+/// ```
+///
+/// There are two pushdowns we can do here:
+/// 1. Push down the `d.size > 100` filter through the `HashJoinExec` node to
the `DataSourceExec`
+/// node for the `departments` table.
+/// 2. Push down the hash table state from the `HashJoinExec` node to the
`DataSourceExec` node to avoid reading
+/// rows from the `users` table that will be eliminated by the join.
+/// This can be done via a bloom filter or similar and is not (yet)
supported
+/// in DataFusion. See <https://github.com/apache/datafusion/issues/7955>.
+///
+/// ```text
+/// ┌─────────────────────┐
+/// │ │
+/// │ HashJoinExec │
+/// │ [u.dept@hash(d.id)] │
+/// │ │
+/// └─────────────────────┘
+/// │
+/// ┌────────────┴────────────┐
+/// ┌──────────▼──────────┐ ┌──────────▼──────────┐
+/// │ DataSourceExec │ │ DataSourceExec │
+/// │ alias [users as u] │ │ alias [dept as d] │
+/// │ filters = │ │ filters = │
+/// │ [depg@hash(d.id)] │ │ [ d.size > 100] │
+/// └─────────────────────┘ └─────────────────────┘
+/// ```
+///
+/// You may notice in this case that the filter is *dynamic*: the hash table
+/// is built _after_ the `departments` table is read and at runtime. We
+/// don't have a concrete `InList` filter or similar to push down at
+/// optimization time. These sorts of dynamic filters are handled by
+/// building a specialized [`PhysicalExpr`] that can be evaluated at runtime
+/// and internally maintains a reference to the hash table or other state.
+///
+/// To make working with these sorts of dynamic filters more tractable we have
the method [`PhysicalExpr::snapshot`]
+/// which attempts to simplify a dynamic filter into a "basic" non-dynamic
filter.
+/// For a join this could mean converting it to an `InList` filter or a
min/max filter for example.
+/// See `datafusion/physical-plan/src/dynamic_filters.rs` for more details.
+///
+/// # Example: Push TopK filters into Scans
+///
+/// Another form of dynamic filter is pushing down the state of a `TopK`
+/// operator for queries like `SELECT * FROM t ORDER BY id LIMIT 10`:
+///
+/// ```text
+/// ┌──────────────────────┐
+/// │ TopK │
+/// │ limit = 10 │
+/// │ order by = [id] │
+/// └──────────────────────┘
+/// │
+/// ▼
+/// ┌──────────────────────┐
+/// │ DataSourceExec │
+/// │ projection = * │
+/// └──────────────────────┘
+/// ```
+///
+/// We can avoid large amounts of data processing by transforming this into:
+///
+/// ```text
+/// ┌──────────────────────┐
+/// │ TopK │
+/// │ limit = 10 │
+/// │ order by = [id] │
+/// └──────────────────────┘
+/// │
+/// ▼
+/// ┌──────────────────────┐
+/// │ DataSourceExec │
+/// │ projection = * │
+/// │ filters = │
+/// │ [id < @ TopKHeap] │
+/// └──────────────────────┘
+/// ```
+///
+/// Now as we fill our `TopK` heap we can push down the state of the heap to
+/// the `DataSourceExec` node to avoid reading files / row groups / pages /
+/// rows that could not possibly be in the top 10.
+///
+/// This is not yet implemented in DataFusion. See
+/// <https://github.com/apache/datafusion/issues/15037>
+///
+/// [`PhysicalExpr`]: datafusion_physical_plan::PhysicalExpr
+/// [`PhysicalExpr::snapshot`]:
datafusion_physical_plan::PhysicalExpr::snapshot
+/// [`FilterExec`]: datafusion_physical_plan::filter::FilterExec
+/// [`ProjectionExec`]: datafusion_physical_plan::projection::ProjectionExec
+/// [`AggregateExec`]: datafusion_physical_plan::aggregates::AggregateExec
+#[derive(Debug)]
+pub struct PushdownFilter {}
+
+impl Default for PushdownFilter {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+pub type FilterDescriptionContext = PlanContext<FilterDescription>;
+
+impl PhysicalOptimizerRule for PushdownFilter {
+ fn optimize(
+ &self,
+ plan: Arc<dyn ExecutionPlan>,
+ config: &ConfigOptions,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let context = FilterDescriptionContext::new_default(plan);
+
+ context
+ .transform_up(|node| {
+ if node.plan.as_any().downcast_ref::<FilterExec>().is_some() {
+ let initial_plan = Arc::clone(&node.plan);
+ let mut accept_updated = false;
+ let updated_node = node.transform_down(|filter_node| {
+ Self::try_pushdown(filter_node, config, &mut
accept_updated)
+ });
+
+ if accept_updated {
+ updated_node
+ } else {
+
Ok(Transformed::no(FilterDescriptionContext::new_default(
+ initial_plan,
+ )))
+ }
+ }
+ // Other filter introducing operators extends here
+ else {
+ Ok(Transformed::no(node))
+ }
+ })
+ .map(|updated| updated.data.plan)
+ }
+
+ fn name(&self) -> &str {
+ "PushdownFilter"
+ }
+
+ fn schema_check(&self) -> bool {
+ true // Filter pushdown does not change the schema of the plan
+ }
+}
+
+impl PushdownFilter {
+ pub fn new() -> Self {
+ Self {}
+ }
+
+ fn try_pushdown(
+ mut node: FilterDescriptionContext,
+ config: &ConfigOptions,
+ accept_updated: &mut bool,
+ ) -> Result<Transformed<FilterDescriptionContext>> {
+ let initial_description = FilterDescription {
+ filters: node.data.take_description(),
+ };
+
+ let FilterPushdownResult {
+ support,
+ remaining_description,
+ } = node
+ .plan
+ .try_pushdown_filters(initial_description, config)?;
+
+ match support {
+ FilterPushdownSupport::Supported {
+ mut child_descriptions,
+ op,
+ revisit,
+ } => {
+ if revisit {
+ // This check handles cases where the current operator is
entirely removed
+ // from the plan and replaced with its child. In such
cases, to not skip
+ // over the new node, we need to explicitly re-apply this
pushdown logic
+ // to the new node.
+ //
+ // TODO: If TreeNodeRecursion supports a Revisit mechanism
in the future,
+ // this manual recursion could be removed.
+
+ // If the operator is removed, it should not leave any
filters as remaining
+ debug_assert!(remaining_description.filters.is_empty());
+ // Operators having 2 children cannot be removed
+ debug_assert_eq!(child_descriptions.len(), 1);
+ debug_assert_eq!(node.children.len(), 1);
+
+ node.plan = op;
+ node.data = child_descriptions.swap_remove(0);
+ node.children = node.children.swap_remove(0).children;
+ Self::try_pushdown(node, config, accept_updated)
+ } else {
+ if remaining_description.filters.is_empty() {
+ // Filter can be pushed down safely
+ node.plan = op;
+ if node.children.is_empty() {
+ *accept_updated = true;
+ } else {
+ for (child, descr) in
+
node.children.iter_mut().zip(child_descriptions)
+ {
+ child.data = descr;
+ }
+ }
+ } else {
+ // Filter cannot be pushed down
+ node = insert_filter_exec(
+ node,
+ child_descriptions,
+ remaining_description,
+ )?;
+ }
+ Ok(Transformed::yes(node))
+ }
+ }
+ FilterPushdownSupport::NotSupported => {
+ if remaining_description.filters.is_empty() {
+ Ok(Transformed {
+ data: node,
+ transformed: false,
+ tnr: TreeNodeRecursion::Stop,
+ })
+ } else {
+ node = insert_filter_exec(
+ node,
+ vec![FilterDescription::empty(); 1],
+ remaining_description,
+ )?;
+ Ok(Transformed {
+ data: node,
+ transformed: true,
+ tnr: TreeNodeRecursion::Stop,
+ })
+ }
+ }
+ }
+ }
+}
+
+fn insert_filter_exec(
+ node: FilterDescriptionContext,
+ mut child_descriptions: Vec<FilterDescription>,
+ remaining_description: FilterDescription,
+) -> Result<FilterDescriptionContext> {
+ let mut new_child_node = node;
+
+ // Filter has one child
+ if !child_descriptions.is_empty() {
+ debug_assert_eq!(child_descriptions.len(), 1);
+ new_child_node.data = child_descriptions.swap_remove(0);
+ }
+ let new_plan = Arc::new(FilterExec::try_new(
+ conjunction(remaining_description.filters),
+ Arc::clone(&new_child_node.plan),
+ )?);
+ let new_children = vec![new_child_node];
+ let new_data = FilterDescription::empty();
+
+ Ok(FilterDescriptionContext::new(
+ new_plan,
+ new_data,
+ new_children,
+ ))
+}
diff --git a/datafusion/physical-plan/src/coalesce_batches.rs
b/datafusion/physical-plan/src/coalesce_batches.rs
index 5244038b9a..faab5fdc5e 100644
--- a/datafusion/physical-plan/src/coalesce_batches.rs
+++ b/datafusion/physical-plan/src/coalesce_batches.rs
@@ -35,6 +35,10 @@ use datafusion_execution::TaskContext;
use crate::coalesce::{BatchCoalescer, CoalescerState};
use crate::execution_plan::CardinalityEffect;
+use crate::filter_pushdown::{
+ filter_pushdown_transparent, FilterDescription, FilterPushdownResult,
+};
+use datafusion_common::config::ConfigOptions;
use futures::ready;
use futures::stream::{Stream, StreamExt};
@@ -212,6 +216,17 @@ impl ExecutionPlan for CoalesceBatchesExec {
fn cardinality_effect(&self) -> CardinalityEffect {
CardinalityEffect::Equal
}
+
+ fn try_pushdown_filters(
+ &self,
+ fd: FilterDescription,
+ _config: &ConfigOptions,
+ ) -> Result<FilterPushdownResult<Arc<dyn ExecutionPlan>>> {
+ Ok(filter_pushdown_transparent::<Arc<dyn ExecutionPlan>>(
+ Arc::new(self.clone()),
+ fd,
+ ))
+ }
}
/// Stream for [`CoalesceBatchesExec`]. See [`CoalesceBatchesExec`] for more
details.
diff --git a/datafusion/physical-plan/src/execution_plan.rs
b/datafusion/physical-plan/src/execution_plan.rs
index 2bc5706ee0..2b6eac7be0 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -16,6 +16,9 @@
// under the License.
pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType,
VerboseDisplay};
+use crate::filter_pushdown::{
+ filter_pushdown_not_supported, FilterDescription, FilterPushdownResult,
+};
pub use crate::metrics::Metric;
pub use crate::ordering::InputOrderMode;
pub use crate::stream::EmptyRecordBatchStream;
@@ -467,6 +470,41 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
Ok(None)
}
+
+ /// Attempts to recursively push given filters from the top of the tree
into leafs.
+ ///
+ /// This is used for various optimizations, such as:
+ ///
+ /// * Pushing down filters into scans in general to minimize the amount of
data that needs to be materialzied.
+ /// * Pushing down dynamic filters from operators like TopK and Joins into
scans.
+ ///
+ /// Generally the further down (closer to leaf nodes) that filters can be
pushed, the better.
+ ///
+ /// Consider the case of a query such as `SELECT * FROM t WHERE a = 1 AND
b = 2`.
+ /// With no filter pushdown the scan needs to read and materialize all the
data from `t` and then filter based on `a` and `b`.
+ /// With filter pushdown into the scan it can first read only `a`, then
`b` and keep track of
+ /// which rows match the filter.
+ /// Then only for rows that match the filter does it have to materialize
the rest of the columns.
+ ///
+ /// # Default Implementation
+ ///
+ /// The default implementation assumes:
+ /// * Parent filters can't be passed onto children.
+ /// * This node has no filters to contribute.
+ ///
+ /// # Implementation Notes
+ ///
+ /// Most of the actual logic is implemented as a Physical Optimizer rule.
+ /// See [`PushdownFilter`] for more details.
+ ///
+ /// [`PushdownFilter`]:
https://docs.rs/datafusion/latest/datafusion/physical_optimizer/filter_pushdown/struct.PushdownFilter.html
+ fn try_pushdown_filters(
+ &self,
+ fd: FilterDescription,
+ _config: &ConfigOptions,
+ ) -> Result<FilterPushdownResult<Arc<dyn ExecutionPlan>>> {
+ Ok(filter_pushdown_not_supported(fd))
+ }
}
/// [`ExecutionPlan`] Invariant Level
@@ -519,13 +557,15 @@ pub trait ExecutionPlanProperties {
/// If this ExecutionPlan makes no changes to the schema of the rows
flowing
/// through it or how columns within each row relate to each other, it
/// should return the equivalence properties of its input. For
- /// example, since `FilterExec` may remove rows from its input, but does
not
+ /// example, since [`FilterExec`] may remove rows from its input, but does
not
/// otherwise modify them, it preserves its input equivalence properties.
/// However, since `ProjectionExec` may calculate derived expressions, it
/// needs special handling.
///
/// See also [`ExecutionPlan::maintains_input_order`] and
[`Self::output_ordering`]
/// for related concepts.
+ ///
+ /// [`FilterExec`]: crate::filter::FilterExec
fn equivalence_properties(&self) -> &EquivalenceProperties;
}
diff --git a/datafusion/physical-plan/src/filter.rs
b/datafusion/physical-plan/src/filter.rs
index a8a9973ea0..95fa67025e 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -26,6 +26,9 @@ use super::{
};
use crate::common::can_project;
use crate::execution_plan::CardinalityEffect;
+use crate::filter_pushdown::{
+ FilterDescription, FilterPushdownResult, FilterPushdownSupport,
+};
use crate::projection::{
make_with_child, try_embed_projection, update_expr, EmbeddedProjection,
ProjectionExec,
@@ -39,6 +42,7 @@ use arrow::compute::filter_record_batch;
use arrow::datatypes::{DataType, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::cast::as_boolean_array;
+use datafusion_common::config::ConfigOptions;
use datafusion_common::stats::Precision;
use datafusion_common::{
internal_err, plan_err, project_schema, DataFusionError, Result,
ScalarValue,
@@ -46,7 +50,7 @@ use datafusion_common::{
use datafusion_execution::TaskContext;
use datafusion_expr::Operator;
use datafusion_physical_expr::equivalence::ProjectionMapping;
-use datafusion_physical_expr::expressions::BinaryExpr;
+use datafusion_physical_expr::expressions::{BinaryExpr, Column};
use datafusion_physical_expr::intervals::utils::check_support;
use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr::{
@@ -433,6 +437,56 @@ impl ExecutionPlan for FilterExec {
}
try_embed_projection(projection, self)
}
+
+ fn try_pushdown_filters(
+ &self,
+ mut fd: FilterDescription,
+ _config: &ConfigOptions,
+ ) -> Result<FilterPushdownResult<Arc<dyn ExecutionPlan>>> {
+ // Extend the filter descriptions
+ fd.filters.push(Arc::clone(&self.predicate));
+
+ // Extract the information
+ let child_descriptions = vec![fd];
+ let remaining_description = FilterDescription { filters: vec![] };
+ let filter_input = Arc::clone(self.input());
+
+ if let Some(projection_indices) = self.projection.as_ref() {
+ // Push the filters down, but leave a ProjectionExec behind,
instead of the FilterExec
+ let filter_child_schema = filter_input.schema();
+ let proj_exprs = projection_indices
+ .iter()
+ .map(|p| {
+ let field = filter_child_schema.field(*p).clone();
+ (
+ Arc::new(Column::new(field.name(), *p)) as Arc<dyn
PhysicalExpr>,
+ field.name().to_string(),
+ )
+ })
+ .collect::<Vec<_>>();
+ let projection_exec =
+ Arc::new(ProjectionExec::try_new(proj_exprs, filter_input)?)
as _;
+
+ Ok(FilterPushdownResult {
+ support: FilterPushdownSupport::Supported {
+ child_descriptions,
+ op: projection_exec,
+ revisit: false,
+ },
+ remaining_description,
+ })
+ } else {
+ // Pull out the FilterExec, and inform the rule as it should be
re-run
+ Ok(FilterPushdownResult {
+ support: FilterPushdownSupport::Supported {
+ child_descriptions,
+ op: filter_input,
+ revisit: true,
+ },
+ remaining_description,
+ })
+ }
+ }
}
impl EmbeddedProjection for FilterExec {
diff --git a/datafusion/physical-plan/src/filter_pushdown.rs
b/datafusion/physical-plan/src/filter_pushdown.rs
new file mode 100644
index 0000000000..38f5aef592
--- /dev/null
+++ b/datafusion/physical-plan/src/filter_pushdown.rs
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use crate::ExecutionPlan;
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+
+#[derive(Clone, Debug)]
+pub struct FilterDescription {
+ /// Expressions coming from the parent nodes
+ pub filters: Vec<Arc<dyn PhysicalExpr>>,
+}
+
+impl Default for FilterDescription {
+ fn default() -> Self {
+ Self::empty()
+ }
+}
+
+impl FilterDescription {
+ /// Takes the filters out of the struct, leaving an empty vector in its
place.
+ pub fn take_description(&mut self) -> Vec<Arc<dyn PhysicalExpr>> {
+ std::mem::take(&mut self.filters)
+ }
+
+ pub fn empty() -> FilterDescription {
+ Self { filters: vec![] }
+ }
+}
+
+#[derive(Debug)]
+pub enum FilterPushdownSupport<T> {
+ Supported {
+ // Filter predicates which can be pushed down through the operator.
+ // NOTE that these are not placed into any operator.
+ child_descriptions: Vec<FilterDescription>,
+ // Possibly updated new operator
+ op: T,
+ // Whether the node is removed from the plan and the rule should be
re-run manually
+ // on the new node.
+ // TODO: If TreeNodeRecursion supports Revisit mechanism, this flag
can be removed
+ revisit: bool,
+ },
+ NotSupported,
+}
+
+#[derive(Debug)]
+pub struct FilterPushdownResult<T> {
+ pub support: FilterPushdownSupport<T>,
+ // Filters which cannot be pushed down through the operator.
+ // NOTE that caller of try_pushdown_filters() should handle these
remanining predicates,
+ // possibly introducing a FilterExec on top of this operator.
+ pub remaining_description: FilterDescription,
+}
+
+pub fn filter_pushdown_not_supported<T>(
+ remaining_description: FilterDescription,
+) -> FilterPushdownResult<T> {
+ FilterPushdownResult {
+ support: FilterPushdownSupport::NotSupported,
+ remaining_description,
+ }
+}
+
+pub fn filter_pushdown_transparent<T>(
+ plan: Arc<dyn ExecutionPlan>,
+ fd: FilterDescription,
+) -> FilterPushdownResult<Arc<dyn ExecutionPlan>> {
+ let child_descriptions = vec![fd];
+ let remaining_description = FilterDescription::empty();
+
+ FilterPushdownResult {
+ support: FilterPushdownSupport::Supported {
+ child_descriptions,
+ op: plan,
+ revisit: false,
+ },
+ remaining_description,
+ }
+}
diff --git a/datafusion/physical-plan/src/lib.rs
b/datafusion/physical-plan/src/lib.rs
index b256e615b2..a1862554b3 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -67,6 +67,7 @@ pub mod empty;
pub mod execution_plan;
pub mod explain;
pub mod filter;
+pub mod filter_pushdown;
pub mod joins;
pub mod limit;
pub mod memory;
diff --git a/datafusion/physical-plan/src/repartition/mod.rs
b/datafusion/physical-plan/src/repartition/mod.rs
index 71479ffa96..c480fc2aba 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -43,6 +43,7 @@ use crate::{DisplayFormatType, ExecutionPlan, Partitioning,
PlanProperties, Stat
use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions};
use arrow::compute::take_arrays;
use arrow::datatypes::{SchemaRef, UInt32Type};
+use datafusion_common::config::ConfigOptions;
use datafusion_common::utils::transpose;
use datafusion_common::HashMap;
use datafusion_common::{not_impl_err, DataFusionError, Result};
@@ -52,6 +53,9 @@ use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use crate::filter_pushdown::{
+ filter_pushdown_transparent, FilterDescription, FilterPushdownResult,
+};
use futures::stream::Stream;
use futures::{FutureExt, StreamExt, TryStreamExt};
use log::trace;
@@ -730,6 +734,17 @@ impl ExecutionPlan for RepartitionExec {
new_partitioning,
)?)))
}
+
+ fn try_pushdown_filters(
+ &self,
+ fd: FilterDescription,
+ _config: &ConfigOptions,
+ ) -> Result<FilterPushdownResult<Arc<dyn ExecutionPlan>>> {
+ Ok(filter_pushdown_transparent::<Arc<dyn ExecutionPlan>>(
+ Arc::new(self.clone()),
+ fd,
+ ))
+ }
}
impl RepartitionExec {
diff --git a/datafusion/sqllogictest/test_files/aggregate.slt
b/datafusion/sqllogictest/test_files/aggregate.slt
index a55ac079aa..35bbc75814 100644
--- a/datafusion/sqllogictest/test_files/aggregate.slt
+++ b/datafusion/sqllogictest/test_files/aggregate.slt
@@ -6795,4 +6795,3 @@ select c2, count(*) from test WHERE 1 = 1 group by c2;
4 1
5 1
6 1
-
diff --git a/datafusion/sqllogictest/test_files/array.slt
b/datafusion/sqllogictest/test_files/array.slt
index f165d3bf66..e780d6c8b2 100644
--- a/datafusion/sqllogictest/test_files/array.slt
+++ b/datafusion/sqllogictest/test_files/array.slt
@@ -5992,7 +5992,7 @@ logical_plan
02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
03)----SubqueryAlias: test
04)------SubqueryAlias: t
-05)--------Projection:
+05)--------Projection:
06)----------Filter: substr(CAST(md5(CAST(tmp_table.value AS Utf8)) AS Utf8),
Int64(1), Int64(32)) IN ([Utf8View("7f4b18de3cfeb9b4ac78c381ee2ad278"),
Utf8View("a"), Utf8View("b"), Utf8View("c")])
07)------------TableScan: tmp_table projection=[value]
physical_plan
@@ -6021,7 +6021,7 @@ logical_plan
02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
03)----SubqueryAlias: test
04)------SubqueryAlias: t
-05)--------Projection:
+05)--------Projection:
06)----------Filter: substr(CAST(md5(CAST(tmp_table.value AS Utf8)) AS Utf8),
Int64(1), Int64(32)) IN ([Utf8View("7f4b18de3cfeb9b4ac78c381ee2ad278"),
Utf8View("a"), Utf8View("b"), Utf8View("c")])
07)------------TableScan: tmp_table projection=[value]
physical_plan
@@ -6050,7 +6050,7 @@ logical_plan
02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
03)----SubqueryAlias: test
04)------SubqueryAlias: t
-05)--------Projection:
+05)--------Projection:
06)----------Filter: substr(CAST(md5(CAST(tmp_table.value AS Utf8)) AS Utf8),
Int64(1), Int64(32)) IN ([Utf8View("7f4b18de3cfeb9b4ac78c381ee2ad278"),
Utf8View("a"), Utf8View("b"), Utf8View("c")])
07)------------TableScan: tmp_table projection=[value]
physical_plan
@@ -6081,7 +6081,7 @@ logical_plan
02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
03)----SubqueryAlias: test
04)------SubqueryAlias: t
-05)--------Projection:
+05)--------Projection:
06)----------Filter: array_has(LargeList([7f4b18de3cfeb9b4ac78c381ee2ad278, a,
b, c]), substr(CAST(md5(CAST(tmp_table.value AS Utf8)) AS Utf8), Int64(1),
Int64(32)))
07)------------TableScan: tmp_table projection=[value]
physical_plan
@@ -6110,7 +6110,7 @@ logical_plan
02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
03)----SubqueryAlias: test
04)------SubqueryAlias: t
-05)--------Projection:
+05)--------Projection:
06)----------Filter: substr(CAST(md5(CAST(tmp_table.value AS Utf8)) AS Utf8),
Int64(1), Int64(32)) IN ([Utf8View("7f4b18de3cfeb9b4ac78c381ee2ad278"),
Utf8View("a"), Utf8View("b"), Utf8View("c")])
07)------------TableScan: tmp_table projection=[value]
physical_plan
@@ -6130,7 +6130,7 @@ select count(*) from test WHERE array_has([needle],
needle);
----
100000
-# The optimizer does not currently eliminate the filter;
+# The optimizer does not currently eliminate the filter;
# Instead, it's rewritten as `IS NULL OR NOT NULL` due to SQL null semantics
query TT
explain with test AS (SELECT substr(md5(i::text)::text, 1, 32) as needle FROM
generate_series(1, 100000) t(i))
diff --git a/datafusion/sqllogictest/test_files/cte.slt
b/datafusion/sqllogictest/test_files/cte.slt
index e019af9775..32320a06f4 100644
--- a/datafusion/sqllogictest/test_files/cte.slt
+++ b/datafusion/sqllogictest/test_files/cte.slt
@@ -722,7 +722,7 @@ logical_plan
03)----Projection: Int64(1) AS val
04)------EmptyRelation
05)----Projection: Int64(2) AS val
-06)------Cross Join:
+06)------Cross Join:
07)--------Filter: recursive_cte.val < Int64(2)
08)----------TableScan: recursive_cte
09)--------SubqueryAlias: sub_cte
diff --git a/datafusion/sqllogictest/test_files/dictionary.slt
b/datafusion/sqllogictest/test_files/dictionary.slt
index 1769f42c2d..d241e61f33 100644
--- a/datafusion/sqllogictest/test_files/dictionary.slt
+++ b/datafusion/sqllogictest/test_files/dictionary.slt
@@ -456,4 +456,4 @@ statement ok
CREATE TABLE test0 AS VALUES ('foo',1), ('bar',2), ('foo',3);
statement ok
-COPY (SELECT arrow_cast(column1, 'Dictionary(Int32, Utf8)') AS column1,
column2 FROM test0) TO 'test_files/scratch/copy/part_dict_test' STORED AS
PARQUET PARTITIONED BY (column1);
\ No newline at end of file
+COPY (SELECT arrow_cast(column1, 'Dictionary(Int32, Utf8)') AS column1,
column2 FROM test0) TO 'test_files/scratch/copy/part_dict_test' STORED AS
PARQUET PARTITIONED BY (column1);
diff --git a/datafusion/sqllogictest/test_files/explain.slt
b/datafusion/sqllogictest/test_files/explain.slt
index deff793e51..ba2596551f 100644
--- a/datafusion/sqllogictest/test_files/explain.slt
+++ b/datafusion/sqllogictest/test_files/explain.slt
@@ -237,6 +237,7 @@ physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after coalesce_batches SAME TEXT AS ABOVE
physical_plan after OutputRequirements DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b,
c], file_type=csv, has_header=true
physical_plan after LimitAggregation SAME TEXT AS ABOVE
+physical_plan after PushdownFilter SAME TEXT AS ABOVE
physical_plan after LimitPushdown SAME TEXT AS ABOVE
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
@@ -313,6 +314,7 @@ physical_plan after OutputRequirements
01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8),
Bytes=Exact(671),
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]},
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10,
file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671),
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
physical_plan after LimitAggregation SAME TEXT AS ABOVE
+physical_plan after PushdownFilter SAME TEXT AS ABOVE
physical_plan after LimitPushdown DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]},
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10,
file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671),
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
@@ -353,6 +355,7 @@ physical_plan after OutputRequirements
01)GlobalLimitExec: skip=0, fetch=10
02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]},
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10,
file_type=parquet
physical_plan after LimitAggregation SAME TEXT AS ABOVE
+physical_plan after PushdownFilter SAME TEXT AS ABOVE
physical_plan after LimitPushdown DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]},
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10,
file_type=parquet
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
diff --git a/datafusion/sqllogictest/test_files/simplify_expr.slt
b/datafusion/sqllogictest/test_files/simplify_expr.slt
index 9985ab49c2..075ccafcfd 100644
--- a/datafusion/sqllogictest/test_files/simplify_expr.slt
+++ b/datafusion/sqllogictest/test_files/simplify_expr.slt
@@ -107,4 +107,3 @@ query B
SELECT a / NULL::DECIMAL(4,3) > 1.2::decimal(2,1) FROM VALUES (1) AS t(a);
----
NULL
-
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]