This is an automated email from the ASF dual-hosted git repository. adriangb pushed a commit to branch refactor-test-scan in repository https://gitbox.apache.org/repos/asf/datafusion.git
commit 7066aadcf8e566070ceccb9b071fd1db3a800843 Author: Adrian Garcia Badaracco <[email protected]> AuthorDate: Mon Jan 26 08:30:23 2026 -0500 refactor: extract pushdown test utilities to shared module Move TestSource, TestOpener, TestScanBuilder, OptimizationTest and related utilities from filter_pushdown/util.rs to a new shared pushdown_utils.rs module. This allows these utilities to be reused by other pushdown tests like projection_pushdown. Also update TestOpener and TestSource to use ProjectionExprs instead of Vec<usize> for projections, enabling support for complex projection expressions (e.g. get_field). Co-Authored-By: Claude Haiku 4.5 <[email protected]> --- .../tests/physical_optimizer/pushdown_utils.rs | 561 +++++++++++++++++++++ 1 file changed, 561 insertions(+) diff --git a/datafusion/core/tests/physical_optimizer/pushdown_utils.rs b/datafusion/core/tests/physical_optimizer/pushdown_utils.rs new file mode 100644 index 0000000000..524d33ae6e --- /dev/null +++ b/datafusion/core/tests/physical_optimizer/pushdown_utils.rs @@ -0,0 +1,561 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::SchemaRef; +use arrow::{array::RecordBatch, compute::concat_batches}; +use datafusion::{datasource::object_store::ObjectStoreUrl, physical_plan::PhysicalExpr}; +use datafusion_common::{Result, config::ConfigOptions, internal_err}; +use datafusion_datasource::{ + PartitionedFile, file::FileSource, file_scan_config::FileScanConfig, + file_scan_config::FileScanConfigBuilder, file_stream::FileOpenFuture, + file_stream::FileOpener, source::DataSourceExec, +}; +use datafusion_physical_expr::projection::ProjectionExprs; +use datafusion_physical_expr_common::physical_expr::fmt_sql; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::filter::batch_filter; +use datafusion_physical_plan::filter_pushdown::{FilterPushdownPhase, PushedDown}; +use datafusion_physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, displayable, + filter::FilterExec, + filter_pushdown::{ + ChildFilterDescription, ChildPushdownResult, FilterDescription, + FilterPushdownPropagation, + }, + metrics::ExecutionPlanMetricsSet, +}; +use futures::StreamExt; +use futures::{FutureExt, Stream}; +use object_store::ObjectStore; +use std::{ + any::Any, + fmt::{Display, Formatter}, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; +pub struct TestOpener { + batches: Vec<RecordBatch>, + batch_size: Option<usize>, + projection: Option<ProjectionExprs>, + predicate: Option<Arc<dyn PhysicalExpr>>, +} + +impl FileOpener for TestOpener { + fn open(&self, _partitioned_file: PartitionedFile) -> Result<FileOpenFuture> { + let mut batches = self.batches.clone(); + if self.batches.is_empty() { + return Ok((async { Ok(TestStream::new(vec![]).boxed()) }).boxed()); + } + let schema = self.batches[0].schema(); + if let Some(batch_size) = self.batch_size { + let batch = concat_batches(&batches[0].schema(), &batches)?; + let mut new_batches = Vec::new(); + for i in (0..batch.num_rows()).step_by(batch_size) { + let end = std::cmp::min(i + batch_size, batch.num_rows()); + let batch = batch.slice(i, end - i); + new_batches.push(batch); + } + batches = new_batches.into_iter().collect(); + } + + let mut new_batches = Vec::new(); + for batch in batches { + let batch = if let Some(predicate) = &self.predicate { + batch_filter(&batch, predicate)? + } else { + batch + }; + new_batches.push(batch); + } + batches = new_batches; + + if let Some(projection) = &self.projection { + let projector = projection.make_projector(&schema)?; + batches = batches + .into_iter() + .map(|batch| projector.project_batch(&batch).unwrap()) + .collect(); + } + + let stream = TestStream::new(batches); + + Ok((async { Ok(stream.boxed()) }).boxed()) + } +} + +/// A placeholder data source that accepts filter pushdown +#[derive(Clone)] +pub struct TestSource { + support: bool, + predicate: Option<Arc<dyn PhysicalExpr>>, + batch_size: Option<usize>, + batches: Vec<RecordBatch>, + metrics: ExecutionPlanMetricsSet, + projection: Option<ProjectionExprs>, + table_schema: datafusion_datasource::TableSchema, +} + +impl TestSource { + pub fn new(schema: SchemaRef, support: bool, batches: Vec<RecordBatch>) -> Self { + let table_schema = datafusion_datasource::TableSchema::new(schema, vec![]); + Self { + support, + metrics: ExecutionPlanMetricsSet::new(), + batches, + predicate: None, + batch_size: None, + projection: None, + table_schema, + } + } +} + +impl FileSource for TestSource { + fn create_file_opener( + &self, + _object_store: Arc<dyn ObjectStore>, + _base_config: &FileScanConfig, + _partition: usize, + ) -> Result<Arc<dyn FileOpener>> { + Ok(Arc::new(TestOpener { + batches: self.batches.clone(), + batch_size: self.batch_size, + projection: self.projection.clone(), + predicate: self.predicate.clone(), + })) + } + + fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> { + self.predicate.clone() + } + + fn as_any(&self) -> &dyn Any { + todo!("should not be called") + } + + fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> { + Arc::new(TestSource { + batch_size: Some(batch_size), + ..self.clone() + }) + } + + fn metrics(&self) -> &ExecutionPlanMetricsSet { + &self.metrics + } + + fn file_type(&self) -> &str { + "test" + } + + fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let support = format!(", pushdown_supported={}", self.support); + + let predicate_string = self + .predicate + .as_ref() + .map(|p| format!(", predicate={p}")) + .unwrap_or_default(); + + write!(f, "{support}{predicate_string}") + } + DisplayFormatType::TreeRender => { + if let Some(predicate) = &self.predicate { + writeln!(f, "pushdown_supported={}", fmt_sql(predicate.as_ref()))?; + writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?; + } + Ok(()) + } + } + } + + fn try_pushdown_filters( + &self, + mut filters: Vec<Arc<dyn PhysicalExpr>>, + config: &ConfigOptions, + ) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> { + if self.support && config.execution.parquet.pushdown_filters { + if let Some(internal) = self.predicate.as_ref() { + filters.push(Arc::clone(internal)); + } + let new_node = Arc::new(TestSource { + predicate: datafusion_physical_expr::utils::conjunction_opt( + filters.clone(), + ), + ..self.clone() + }); + Ok(FilterPushdownPropagation::with_parent_pushdown_result( + vec![PushedDown::Yes; filters.len()], + ) + .with_updated_node(new_node)) + } else { + Ok(FilterPushdownPropagation::with_parent_pushdown_result( + vec![PushedDown::No; filters.len()], + )) + } + } + + fn try_pushdown_projection( + &self, + projection: &ProjectionExprs, + ) -> Result<Option<Arc<dyn FileSource>>> { + if let Some(existing_projection) = &self.projection { + // Combine existing projection with new projection + let combined_projection = existing_projection.try_merge(projection)?; + Ok(Some(Arc::new(TestSource { + projection: Some(combined_projection), + table_schema: self.table_schema.clone(), + ..self.clone() + }))) + } else { + Ok(Some(Arc::new(TestSource { + projection: Some(projection.clone()), + ..self.clone() + }))) + } + } + + fn projection(&self) -> Option<&ProjectionExprs> { + self.projection.as_ref() + } + + fn table_schema(&self) -> &datafusion_datasource::TableSchema { + &self.table_schema + } +} + +#[derive(Debug, Clone)] +pub struct TestScanBuilder { + support: bool, + batches: Vec<RecordBatch>, + schema: SchemaRef, +} + +impl TestScanBuilder { + pub fn new(schema: SchemaRef) -> Self { + Self { + support: false, + batches: vec![], + schema, + } + } + + pub fn with_support(mut self, support: bool) -> Self { + self.support = support; + self + } + + pub fn with_batches(mut self, batches: Vec<RecordBatch>) -> Self { + self.batches = batches; + self + } + + pub fn build(self) -> Arc<dyn ExecutionPlan> { + let source = Arc::new(TestSource::new( + Arc::clone(&self.schema), + self.support, + self.batches, + )); + let base_config = + FileScanConfigBuilder::new(ObjectStoreUrl::parse("test://").unwrap(), source) + .with_file(PartitionedFile::new("test.parquet", 123)) + .build(); + DataSourceExec::from_data_source(base_config) + } +} + +/// Index into the data that has been returned so far +#[derive(Debug, Default, Clone)] +pub struct BatchIndex { + inner: Arc<std::sync::Mutex<usize>>, +} + +impl BatchIndex { + /// Return the current index + pub fn value(&self) -> usize { + let inner = self.inner.lock().unwrap(); + *inner + } + + // increment the current index by one + pub fn incr(&self) { + let mut inner = self.inner.lock().unwrap(); + *inner += 1; + } +} + +/// Iterator over batches +#[derive(Debug, Default)] +pub struct TestStream { + /// Vector of record batches + data: Vec<RecordBatch>, + /// Index into the data that has been returned so far + index: BatchIndex, +} + +impl TestStream { + /// Create an iterator for a vector of record batches. Assumes at + /// least one entry in data (for the schema) + pub fn new(data: Vec<RecordBatch>) -> Self { + // check that there is at least one entry in data and that all batches have the same schema + if let Some(first) = data.first() { + assert!( + data.iter().all(|batch| batch.schema() == first.schema()), + "all batches must have the same schema" + ); + } + Self { + data, + ..Default::default() + } + } +} + +impl Stream for TestStream { + type Item = Result<RecordBatch>; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let next_batch = self.index.value(); + + Poll::Ready(if next_batch < self.data.len() { + let next_batch = self.index.value(); + self.index.incr(); + Some(Ok(self.data[next_batch].clone())) + } else { + None + }) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + (self.data.len(), Some(self.data.len())) + } +} + +/// A harness for testing physical optimizers. +/// +/// You can use this to test the output of a physical optimizer rule using insta snapshots +#[derive(Debug)] +pub struct OptimizationTest { + input: Vec<String>, + output: Result<Vec<String>, String>, +} + +impl OptimizationTest { + #[expect(clippy::needless_pass_by_value)] + pub fn new<O>( + input_plan: Arc<dyn ExecutionPlan>, + opt: O, + allow_pushdown_filters: bool, + ) -> Self + where + O: PhysicalOptimizerRule, + { + let mut parquet_pushdown_config = ConfigOptions::default(); + parquet_pushdown_config.execution.parquet.pushdown_filters = + allow_pushdown_filters; + + let input = format_execution_plan(&input_plan); + let input_schema = input_plan.schema(); + + let output_result = opt.optimize(input_plan, &parquet_pushdown_config); + let output = output_result + .and_then(|plan| { + if opt.schema_check() && (plan.schema() != input_schema) { + internal_err!( + "Schema mismatch:\n\nBefore:\n{:?}\n\nAfter:\n{:?}", + input_schema, + plan.schema() + ) + } else { + Ok(plan) + } + }) + .map(|plan| format_execution_plan(&plan)) + .map_err(|e| e.to_string()); + + Self { input, output } + } +} + +impl Display for OptimizationTest { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + writeln!(f, "OptimizationTest:")?; + writeln!(f, " input:")?; + for line in &self.input { + writeln!(f, " - {line}")?; + } + writeln!(f, " output:")?; + match &self.output { + Ok(output) => { + writeln!(f, " Ok:")?; + for line in output { + writeln!(f, " - {line}")?; + } + } + Err(err) => { + writeln!(f, " Err: {err}")?; + } + } + Ok(()) + } +} + +pub fn format_execution_plan(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> { + format_lines(&displayable(plan.as_ref()).indent(false).to_string()) +} + +fn format_lines(s: &str) -> Vec<String> { + s.trim().split('\n').map(|s| s.to_string()).collect() +} + +pub fn format_plan_for_test(plan: &Arc<dyn ExecutionPlan>) -> String { + let mut out = String::new(); + for line in format_execution_plan(plan) { + out.push_str(&format!(" - {line}\n")); + } + out.push('\n'); + out +} + +#[derive(Debug)] +pub(crate) struct TestNode { + inject_filter: bool, + input: Arc<dyn ExecutionPlan>, + predicate: Arc<dyn PhysicalExpr>, +} + +impl TestNode { + pub fn new( + inject_filter: bool, + input: Arc<dyn ExecutionPlan>, + predicate: Arc<dyn PhysicalExpr>, + ) -> Self { + Self { + inject_filter, + input, + predicate, + } + } +} + +impl DisplayAs for TestNode { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + write!( + f, + "TestInsertExec {{ inject_filter: {} }}", + self.inject_filter + ) + } +} + +impl ExecutionPlan for TestNode { + fn name(&self) -> &str { + "TestInsertExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + self.input.properties() + } + + fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { + vec![&self.input] + } + + fn with_new_children( + self: Arc<Self>, + children: Vec<Arc<dyn ExecutionPlan>>, + ) -> Result<Arc<dyn ExecutionPlan>> { + assert!(children.len() == 1); + Ok(Arc::new(TestNode::new( + self.inject_filter, + children[0].clone(), + self.predicate.clone(), + ))) + } + + fn execute( + &self, + _partition: usize, + _context: Arc<datafusion_execution::TaskContext>, + ) -> Result<datafusion_execution::SendableRecordBatchStream> { + unimplemented!("TestInsertExec is a stub for testing.") + } + + fn gather_filters_for_pushdown( + &self, + _phase: FilterPushdownPhase, + parent_filters: Vec<Arc<dyn PhysicalExpr>>, + _config: &ConfigOptions, + ) -> Result<FilterDescription> { + // Since TestNode marks all parent filters as supported and adds its own filter, + // we use from_child to create a description with all parent filters supported + let child = &self.input; + let child_desc = ChildFilterDescription::from_child(&parent_filters, child)? + .with_self_filter(Arc::clone(&self.predicate)); + Ok(FilterDescription::new().with_child(child_desc)) + } + + fn handle_child_pushdown_result( + &self, + _phase: FilterPushdownPhase, + child_pushdown_result: ChildPushdownResult, + _config: &ConfigOptions, + ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> { + if self.inject_filter { + // Add a FilterExec if our own filter was not handled by the child + + // We have 1 child + assert_eq!(child_pushdown_result.self_filters.len(), 1); + let self_pushdown_result = child_pushdown_result.self_filters[0].clone(); + // And pushed down 1 filter + assert_eq!(self_pushdown_result.len(), 1); + let self_pushdown_result: Vec<_> = self_pushdown_result.into_iter().collect(); + + let first_pushdown_result = self_pushdown_result[0].clone(); + + match &first_pushdown_result.discriminant { + PushedDown::No => { + // We have a filter to push down + let new_child = FilterExec::try_new( + Arc::clone(&first_pushdown_result.predicate), + Arc::clone(&self.input), + )?; + let new_self = + TestNode::new(false, Arc::new(new_child), self.predicate.clone()); + let mut res = + FilterPushdownPropagation::if_all(child_pushdown_result); + res.updated_node = Some(Arc::new(new_self) as Arc<dyn ExecutionPlan>); + Ok(res) + } + PushedDown::Yes => { + let res = FilterPushdownPropagation::if_all(child_pushdown_result); + Ok(res) + } + } + } else { + let res = FilterPushdownPropagation::if_all(child_pushdown_result); + Ok(res) + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
