This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new be42f3d17c Return TableProviderFilterPushDown::Exact when Parquet 
Pushdown Enabled (#12135)
be42f3d17c is described below

commit be42f3d17c792cd31d49ce06a9a95d87b76ef94a
Author: June <[email protected]>
AuthorDate: Tue Sep 17 10:23:58 2024 -0600

    Return TableProviderFilterPushDown::Exact when Parquet Pushdown Enabled 
(#12135)
    
    * feat: Preemptively filter for pushdown-preventing columns in ListingTable
    
    * Fix behavior to make all previous tests work and lay groundwork for 
future tests
    
    * fix: Add some more tests and fix small issue with pushdown specificity
    
    * test: Revive unneccesarily removed test
    
    * ci: Fix CI issues with different combinations of exprs
    
    * fix: run fmt
    
    * Fix doc publicity issues
    
    * Add ::new fn for PushdownChecker
    
    * Remove unnecessary 'pub' qualifier
    
    * Fix naming and doc comment of non_pushdown_columns to reflect what it 
actually does (the opposite) and add back useful comments
    
    * fmt
    
    * Extend FileFormat trait to allow library users to define formats which 
support pushdown
    
    * fmt
    
    * fix: reference real fn in doc to fix CI
    
    * Minor: Add tests for using FilterExec when parquet was pushed down
    
    * Update datafusion/core/src/datasource/file_format/mod.rs
    
    * Pipe schema information through to TableScan and ParquetExec to 
facilitate unnecessary FilterExec removal
    
    * - Remove collect::<(_, _)> to satisfy msrv
    - Remove expect(_) attr to satisfy msrv
    - Update comments with more accurate details and explanations
    
    * Add more details in comments for `map_partial_batch`
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Remove reference to issue #4028 as it will be closed
    
    * Convert normal comments to doc-comments
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Clarify meaning of word `projected` in comment
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Clarify more how `table_schema` is used differently from 
`projected_table_schema`
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Finish partially-written comment about SchemaMapping struct
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/core/src/datasource/file_format/mod.rs  |  28 ++
 .../core/src/datasource/file_format/parquet.rs     |  28 +-
 datafusion/core/src/datasource/listing/helpers.rs  | 110 ++++----
 datafusion/core/src/datasource/listing/table.rs    |  71 +++--
 .../datasource/physical_plan/file_scan_config.rs   |   2 +-
 .../core/src/datasource/physical_plan/mod.rs       |   5 +-
 .../src/datasource/physical_plan/parquet/mod.rs    |   4 +-
 .../src/datasource/physical_plan/parquet/opener.rs |   4 +-
 .../datasource/physical_plan/parquet/row_filter.rs | 312 ++++++++++++++++-----
 datafusion/core/src/datasource/schema_adapter.rs   | 147 ++++++++--
 datafusion/core/src/physical_planner.rs            |   2 +-
 datafusion/expr-common/src/signature.rs            |   4 +-
 datafusion/expr/src/logical_plan/plan.rs           |  14 +-
 .../optimizer/src/optimize_projections/mod.rs      |  12 +-
 .../src/optimize_projections/required_indices.rs   |  15 +-
 datafusion/physical-expr/src/expressions/binary.rs |   4 +-
 datafusion/physical-expr/src/expressions/column.rs |   5 +-
 datafusion/proto/src/logical_plan/mod.rs           |  15 +-
 datafusion/proto/src/physical_plan/mod.rs          | 116 ++++----
 .../join_disable_repartition_joins.slt.temp        |  26 ++
 .../test_files/parquet_filter_pushdown.slt         |  28 +-
 datafusion/sqllogictest/test_files/repartition.slt |   2 +-
 datafusion/sqllogictest/test_files/select.slt      |   2 +-
 datafusion/sqllogictest/test_files/string_view.slt |   1 -
 datafusion/sqllogictest/test_files/window.slt      |   2 +-
 25 files changed, 662 insertions(+), 297 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/mod.rs 
b/datafusion/core/src/datasource/file_format/mod.rs
index 1dcf480cf4..a503e36adb 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -45,6 +45,7 @@ use crate::physical_plan::{ExecutionPlan, Statistics};
 use arrow_schema::{DataType, Field, Schema};
 use datafusion_common::file_options::file_type::FileType;
 use datafusion_common::{internal_err, not_impl_err, GetExt};
+use datafusion_expr::Expr;
 use datafusion_physical_expr::PhysicalExpr;
 
 use async_trait::async_trait;
@@ -138,6 +139,33 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
     ) -> Result<Arc<dyn ExecutionPlan>> {
         not_impl_err!("Writer not implemented for this format")
     }
+
+    /// Check if the specified file format has support for pushing down the 
provided filters within
+    /// the given schemas. Added initially to support the Parquet file 
format's ability to do this.
+    fn supports_filters_pushdown(
+        &self,
+        _file_schema: &Schema,
+        _table_schema: &Schema,
+        _filters: &[&Expr],
+    ) -> Result<FilePushdownSupport> {
+        Ok(FilePushdownSupport::NoSupport)
+    }
+}
+
+/// An enum to distinguish between different states when determining if 
certain filters can be
+/// pushed down to file scanning
+#[derive(Debug, PartialEq)]
+pub enum FilePushdownSupport {
+    /// The file format/system being asked does not support any sort of 
pushdown. This should be
+    /// used even if the file format theoretically supports some sort of 
pushdown, but it's not
+    /// enabled or implemented yet.
+    NoSupport,
+    /// The file format/system being asked *does* support pushdown, but it 
can't make it work for
+    /// the provided filter/expression
+    NotSupportedForFilter,
+    /// The file format/system being asked *does* support pushdown and *can* 
make it work for the
+    /// provided filter/expression
+    Supported,
 }
 
 /// A container of [FileFormatFactory] which also implements [FileType].
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index 2a862dd6dc..35296b0d79 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -26,7 +26,7 @@ use super::write::demux::start_demuxer_task;
 use super::write::{create_writer, SharedBuffer};
 use super::{
     coerce_file_schema_to_view_type, transform_schema_to_view, FileFormat,
-    FileFormatFactory, FileScanConfig,
+    FileFormatFactory, FilePushdownSupport, FileScanConfig,
 };
 use crate::arrow::array::RecordBatch;
 use crate::arrow::datatypes::{Fields, Schema, SchemaRef};
@@ -53,6 +53,7 @@ use datafusion_common::{
 use datafusion_common_runtime::SpawnedTask;
 use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, 
MemoryReservation};
 use datafusion_execution::TaskContext;
+use datafusion_expr::Expr;
 use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
 use datafusion_physical_expr::PhysicalExpr;
 use datafusion_physical_plan::metrics::MetricsSet;
@@ -78,7 +79,9 @@ use tokio::io::{AsyncWrite, AsyncWriteExt};
 use tokio::sync::mpsc::{self, Receiver, Sender};
 use tokio::task::JoinSet;
 
-use crate::datasource::physical_plan::parquet::ParquetExecBuilder;
+use crate::datasource::physical_plan::parquet::{
+    can_expr_be_pushed_down_with_schemas, ParquetExecBuilder,
+};
 use datafusion_physical_expr_common::sort_expr::LexRequirement;
 use futures::{StreamExt, TryStreamExt};
 use object_store::path::Path;
@@ -414,6 +417,27 @@ impl FileFormat for ParquetFormat {
             order_requirements,
         )) as _)
     }
+
+    fn supports_filters_pushdown(
+        &self,
+        file_schema: &Schema,
+        table_schema: &Schema,
+        filters: &[&Expr],
+    ) -> Result<FilePushdownSupport> {
+        if !self.options().global.pushdown_filters {
+            return Ok(FilePushdownSupport::NoSupport);
+        }
+
+        let all_supported = filters.iter().all(|filter| {
+            can_expr_be_pushed_down_with_schemas(filter, file_schema, 
table_schema)
+        });
+
+        Ok(if all_supported {
+            FilePushdownSupport::Supported
+        } else {
+            FilePushdownSupport::NotSupportedForFilter
+        })
+    }
 }
 
 /// Fetches parquet metadata from ObjectStore for given object
diff --git a/datafusion/core/src/datasource/listing/helpers.rs 
b/datafusion/core/src/datasource/listing/helpers.rs
index 33a16237e1..72d7277d6a 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -53,66 +53,64 @@ use object_store::{ObjectMeta, ObjectStore};
 ///   was performed
 pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool {
     let mut is_applicable = true;
-    expr.apply(|expr| {
-        match expr {
-            Expr::Column(Column { ref name, .. }) => {
-                is_applicable &= col_names.contains(&name.as_str());
-                if is_applicable {
-                    Ok(TreeNodeRecursion::Jump)
-                } else {
-                    Ok(TreeNodeRecursion::Stop)
-                }
+    expr.apply(|expr| match expr {
+        Expr::Column(Column { ref name, .. }) => {
+            is_applicable &= col_names.contains(&name.as_str());
+            if is_applicable {
+                Ok(TreeNodeRecursion::Jump)
+            } else {
+                Ok(TreeNodeRecursion::Stop)
             }
-            Expr::Literal(_)
-            | Expr::Alias(_)
-            | Expr::OuterReferenceColumn(_, _)
-            | Expr::ScalarVariable(_, _)
-            | Expr::Not(_)
-            | Expr::IsNotNull(_)
-            | Expr::IsNull(_)
-            | Expr::IsTrue(_)
-            | Expr::IsFalse(_)
-            | Expr::IsUnknown(_)
-            | Expr::IsNotTrue(_)
-            | Expr::IsNotFalse(_)
-            | Expr::IsNotUnknown(_)
-            | Expr::Negative(_)
-            | Expr::Cast { .. }
-            | Expr::TryCast { .. }
-            | Expr::BinaryExpr { .. }
-            | Expr::Between { .. }
-            | Expr::Like { .. }
-            | Expr::SimilarTo { .. }
-            | Expr::InList { .. }
-            | Expr::Exists { .. }
-            | Expr::InSubquery(_)
-            | Expr::ScalarSubquery(_)
-            | Expr::GroupingSet(_)
-            | Expr::Case { .. } => Ok(TreeNodeRecursion::Continue),
-
-            Expr::ScalarFunction(scalar_function) => {
-                match scalar_function.func.signature().volatility {
-                    Volatility::Immutable => Ok(TreeNodeRecursion::Continue),
-                    // TODO: Stable functions could be `applicable`, but that 
would require access to the context
-                    Volatility::Stable | Volatility::Volatile => {
-                        is_applicable = false;
-                        Ok(TreeNodeRecursion::Stop)
-                    }
+        }
+        Expr::Literal(_)
+        | Expr::Alias(_)
+        | Expr::OuterReferenceColumn(_, _)
+        | Expr::ScalarVariable(_, _)
+        | Expr::Not(_)
+        | Expr::IsNotNull(_)
+        | Expr::IsNull(_)
+        | Expr::IsTrue(_)
+        | Expr::IsFalse(_)
+        | Expr::IsUnknown(_)
+        | Expr::IsNotTrue(_)
+        | Expr::IsNotFalse(_)
+        | Expr::IsNotUnknown(_)
+        | Expr::Negative(_)
+        | Expr::Cast(_)
+        | Expr::TryCast(_)
+        | Expr::BinaryExpr(_)
+        | Expr::Between(_)
+        | Expr::Like(_)
+        | Expr::SimilarTo(_)
+        | Expr::InList(_)
+        | Expr::Exists(_)
+        | Expr::InSubquery(_)
+        | Expr::ScalarSubquery(_)
+        | Expr::GroupingSet(_)
+        | Expr::Case(_) => Ok(TreeNodeRecursion::Continue),
+
+        Expr::ScalarFunction(scalar_function) => {
+            match scalar_function.func.signature().volatility {
+                Volatility::Immutable => Ok(TreeNodeRecursion::Continue),
+                // TODO: Stable functions could be `applicable`, but that 
would require access to the context
+                Volatility::Stable | Volatility::Volatile => {
+                    is_applicable = false;
+                    Ok(TreeNodeRecursion::Stop)
                 }
             }
+        }
 
-            // TODO other expressions are not handled yet:
-            // - AGGREGATE and WINDOW should not end up in filter conditions, 
except maybe in some edge cases
-            // - Can `Wildcard` be considered as a `Literal`?
-            // - ScalarVariable could be `applicable`, but that would require 
access to the context
-            Expr::AggregateFunction { .. }
-            | Expr::WindowFunction { .. }
-            | Expr::Wildcard { .. }
-            | Expr::Unnest { .. }
-            | Expr::Placeholder(_) => {
-                is_applicable = false;
-                Ok(TreeNodeRecursion::Stop)
-            }
+        // TODO other expressions are not handled yet:
+        // - AGGREGATE and WINDOW should not end up in filter conditions, 
except maybe in some edge cases
+        // - Can `Wildcard` be considered as a `Literal`?
+        // - ScalarVariable could be `applicable`, but that would require 
access to the context
+        Expr::AggregateFunction { .. }
+        | Expr::WindowFunction { .. }
+        | Expr::Wildcard { .. }
+        | Expr::Unnest { .. }
+        | Expr::Placeholder(_) => {
+            is_applicable = false;
+            Ok(TreeNodeRecursion::Stop)
         }
     })
     .unwrap();
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index adf907011b..3541a8ff21 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -18,16 +18,17 @@
 //! The table implementation.
 
 use std::collections::HashMap;
-use std::str::FromStr;
-use std::{any::Any, sync::Arc};
+use std::{any::Any, str::FromStr, sync::Arc};
 
 use super::helpers::{expr_applicable_for_cols, pruned_partition_list, 
split_files};
-use super::PartitionedFile;
+use super::{ListingTableUrl, PartitionedFile};
 
-use super::ListingTableUrl;
-use crate::datasource::{create_ordering, get_statistics_with_limit};
 use crate::datasource::{
-    file_format::{file_compression_type::FileCompressionType, FileFormat},
+    create_ordering,
+    file_format::{
+        file_compression_type::FileCompressionType, FileFormat, 
FilePushdownSupport,
+    },
+    get_statistics_with_limit,
     physical_plan::{FileScanConfig, FileSinkConfig},
 };
 use crate::execution::context::SessionState;
@@ -43,8 +44,9 @@ use datafusion_common::{
     config_datafusion_err, internal_err, plan_err, project_schema, Constraints,
     SchemaExt, ToDFSchema,
 };
-use datafusion_execution::cache::cache_manager::FileStatisticsCache;
-use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
+use datafusion_execution::cache::{
+    cache_manager::FileStatisticsCache, cache_unit::DefaultFileStatisticsCache,
+};
 use datafusion_physical_expr::{
     create_physical_expr, LexOrdering, PhysicalSortRequirement,
 };
@@ -817,19 +819,22 @@ impl TableProvider for ListingTable {
             .map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone()))
             .collect::<Result<Vec<_>>>()?;
 
-        let filters = if let Some(expr) = conjunction(filters.to_vec()) {
-            // NOTE: Use the table schema (NOT file schema) here because 
`expr` may contain references to partition columns.
-            let table_df_schema = 
self.table_schema.as_ref().clone().to_dfschema()?;
-            let filters =
-                create_physical_expr(&expr, &table_df_schema, 
state.execution_props())?;
-            Some(filters)
-        } else {
-            None
-        };
+        let filters = conjunction(filters.to_vec())
+            .map(|expr| -> Result<_> {
+                // NOTE: Use the table schema (NOT file schema) here because 
`expr` may contain references to partition columns.
+                let table_df_schema = 
self.table_schema.as_ref().clone().to_dfschema()?;
+                let filters = create_physical_expr(
+                    &expr,
+                    &table_df_schema,
+                    state.execution_props(),
+                )?;
+                Ok(Some(filters))
+            })
+            .unwrap_or(Ok(None))?;
 
-        let object_store_url = if let Some(url) = self.table_paths.first() {
-            url.object_store()
-        } else {
+        let Some(object_store_url) =
+            self.table_paths.first().map(ListingTableUrl::object_store)
+        else {
             return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
         };
 
@@ -854,7 +859,7 @@ impl TableProvider for ListingTable {
         &self,
         filters: &[&Expr],
     ) -> Result<Vec<TableProviderFilterPushDown>> {
-        Ok(filters
+        filters
             .iter()
             .map(|filter| {
                 if expr_applicable_for_cols(
@@ -862,19 +867,29 @@ impl TableProvider for ListingTable {
                         .options
                         .table_partition_cols
                         .iter()
-                        .map(|x| x.0.as_str())
+                        .map(|col| col.0.as_str())
                         .collect::<Vec<_>>(),
                     filter,
                 ) {
                     // if filter can be handled by partition pruning, it is 
exact
-                    TableProviderFilterPushDown::Exact
-                } else {
-                    // otherwise, we still might be able to handle the filter 
with file
-                    // level mechanisms such as Parquet row group pruning.
-                    TableProviderFilterPushDown::Inexact
+                    return Ok(TableProviderFilterPushDown::Exact);
+                }
+
+                // if we can't push it down completely with only the 
filename-based/path-based
+                // column names, then we should check if we can do parquet 
predicate pushdown
+                let supports_pushdown = 
self.options.format.supports_filters_pushdown(
+                    &self.file_schema,
+                    &self.table_schema,
+                    &[filter],
+                )?;
+
+                if supports_pushdown == FilePushdownSupport::Supported {
+                    return Ok(TableProviderFilterPushDown::Exact);
                 }
+
+                Ok(TableProviderFilterPushDown::Inexact)
             })
-            .collect())
+            .collect()
     }
 
     fn get_table_definition(&self) -> Option<&str> {
diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs 
b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
index 9f67418569..2c438e8b0e 100644
--- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
@@ -258,7 +258,7 @@ impl FileScanConfig {
         (projected_schema, table_stats, projected_output_ordering)
     }
 
-    #[allow(unused)] // Only used by avro
+    #[cfg_attr(not(feature = "avro"), allow(unused))] // Only used by avro
     pub(crate) fn projected_file_column_names(&self) -> Option<Vec<String>> {
         self.projection.as_ref().map(|p| {
             p.iter()
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs 
b/datafusion/core/src/datasource/physical_plan/mod.rs
index f810fb86bd..4018b3bb29 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -516,7 +516,8 @@ mod tests {
             Field::new("c3", DataType::Float64, true),
         ]));
 
-        let adapter = 
DefaultSchemaAdapterFactory::default().create(table_schema.clone());
+        let adapter = DefaultSchemaAdapterFactory
+            .create(table_schema.clone(), table_schema.clone());
 
         let file_schema = Schema::new(vec![
             Field::new("c1", DataType::Utf8, true),
@@ -573,7 +574,7 @@ mod tests {
 
         let indices = vec![1, 2, 4];
         let schema = SchemaRef::from(table_schema.project(&indices).unwrap());
-        let adapter = DefaultSchemaAdapterFactory::default().create(schema);
+        let adapter = DefaultSchemaAdapterFactory.create(schema, 
table_schema.clone());
         let (mapping, projection) = adapter.map_schema(&file_schema).unwrap();
 
         let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 54d4d7262a..f22d02699a 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -61,6 +61,7 @@ pub use access_plan::{ParquetAccessPlan, RowGroupAccess};
 pub use metrics::ParquetFileMetrics;
 use opener::ParquetOpener;
 pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory};
+pub use row_filter::can_expr_be_pushed_down_with_schemas;
 pub use writer::plan_to_parquet;
 
 /// Execution plan for reading one or more Parquet files.
@@ -405,6 +406,7 @@ impl ParquetExecBuilder {
 
         let (projected_schema, projected_statistics, 
projected_output_ordering) =
             base_config.project();
+
         let cache = ParquetExec::compute_properties(
             projected_schema,
             &projected_output_ordering,
@@ -707,7 +709,7 @@ impl ExecutionPlan for ParquetExec {
         let schema_adapter_factory = self
             .schema_adapter_factory
             .clone()
-            .unwrap_or_else(|| 
Arc::new(DefaultSchemaAdapterFactory::default()));
+            .unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory));
 
         let opener = ParquetOpener {
             partition_index,
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
index 2a198c3d45..9880c30ddb 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
@@ -99,7 +99,9 @@ impl FileOpener for ParquetOpener {
 
         let projected_schema =
             SchemaRef::from(self.table_schema.project(&self.projection)?);
-        let schema_adapter = 
self.schema_adapter_factory.create(projected_schema);
+        let schema_adapter = self
+            .schema_adapter_factory
+            .create(projected_schema, self.table_schema.clone());
         let predicate = self.predicate.clone();
         let pruning_predicate = self.pruning_predicate.clone();
         let page_pruning_predicate = self.page_pruning_predicate.clone();
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
index 59d23fd68c..d3bc8030cf 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
@@ -76,7 +76,7 @@ use datafusion_common::cast::as_boolean_array;
 use datafusion_common::tree_node::{
     Transformed, TransformedResult, TreeNode, TreeNodeRecursion, 
TreeNodeRewriter,
 };
-use datafusion_common::{arrow_err, DataFusionError, Result, ScalarValue};
+use datafusion_common::{arrow_datafusion_err, DataFusionError, Result, 
ScalarValue};
 use datafusion_physical_expr::expressions::{Column, Literal};
 use datafusion_physical_expr::utils::reassign_predicate_columns;
 use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
@@ -237,12 +237,6 @@ struct FilterCandidateBuilder<'a> {
     /// The schema of the table (merged schema) -- columns may be in different
     /// order than in the file and have columns that are not in the file schema
     table_schema: &'a Schema,
-    required_column_indices: BTreeSet<usize>,
-    /// Does the expression require any non-primitive columns (like structs)?
-    non_primitive_columns: bool,
-    /// Does the expression reference any columns that are in the table
-    /// schema but not in the file schema?
-    projected_columns: bool,
 }
 
 impl<'a> FilterCandidateBuilder<'a> {
@@ -255,9 +249,6 @@ impl<'a> FilterCandidateBuilder<'a> {
             expr,
             file_schema,
             table_schema,
-            required_column_indices: BTreeSet::default(),
-            non_primitive_columns: false,
-            projected_columns: false,
         }
     }
 
@@ -268,53 +259,87 @@ impl<'a> FilterCandidateBuilder<'a> {
     /// * `Ok(Some(candidate))` if the expression can be used as an ArrowFilter
     /// * `Ok(None)` if the expression cannot be used as an ArrowFilter
     /// * `Err(e)` if an error occurs while building the candidate
-    pub fn build(
-        mut self,
-        metadata: &ParquetMetaData,
-    ) -> Result<Option<FilterCandidate>> {
-        let expr = self.expr.clone().rewrite(&mut self).data()?;
-
-        if self.non_primitive_columns || self.projected_columns {
-            Ok(None)
-        } else {
-            let required_bytes =
-                size_of_columns(&self.required_column_indices, metadata)?;
-            let can_use_index = columns_sorted(&self.required_column_indices, 
metadata)?;
-
-            Ok(Some(FilterCandidate {
-                expr,
-                required_bytes,
-                can_use_index,
-                projection: self.required_column_indices.into_iter().collect(),
-            }))
+    pub fn build(self, metadata: &ParquetMetaData) -> 
Result<Option<FilterCandidate>> {
+        let Some((required_indices, rewritten_expr)) =
+            pushdown_columns(self.expr, self.file_schema, self.table_schema)?
+        else {
+            return Ok(None);
+        };
+
+        let required_bytes = size_of_columns(&required_indices, metadata)?;
+        let can_use_index = columns_sorted(&required_indices, metadata)?;
+
+        Ok(Some(FilterCandidate {
+            expr: rewritten_expr,
+            required_bytes,
+            can_use_index,
+            projection: required_indices.into_iter().collect(),
+        }))
+    }
+}
+
+// a struct that implements TreeNodeRewriter to traverse a PhysicalExpr tree 
structure to determine
+// if any column references in the expression would prevent it from being 
predicate-pushed-down.
+// if non_primitive_columns || projected_columns, it can't be pushed down.
+// can't be reused between calls to `rewrite`; each construction must be used 
only once.
+struct PushdownChecker<'schema> {
+    /// Does the expression require any non-primitive columns (like structs)?
+    non_primitive_columns: bool,
+    /// Does the expression reference any columns that are in the table
+    /// schema but not in the file schema?
+    projected_columns: bool,
+    // the indices of all the columns found within the given expression which 
exist inside the given
+    // [`file_schema`]
+    required_column_indices: BTreeSet<usize>,
+    file_schema: &'schema Schema,
+    table_schema: &'schema Schema,
+}
+
+impl<'schema> PushdownChecker<'schema> {
+    fn new(file_schema: &'schema Schema, table_schema: &'schema Schema) -> 
Self {
+        Self {
+            non_primitive_columns: false,
+            projected_columns: false,
+            required_column_indices: BTreeSet::default(),
+            file_schema,
+            table_schema,
         }
     }
+
+    fn check_single_column(&mut self, column_name: &str) -> 
Option<TreeNodeRecursion> {
+        if let Ok(idx) = self.file_schema.index_of(column_name) {
+            self.required_column_indices.insert(idx);
+
+            if DataType::is_nested(self.file_schema.field(idx).data_type()) {
+                self.non_primitive_columns = true;
+                return Some(TreeNodeRecursion::Jump);
+            }
+        } else if self.table_schema.index_of(column_name).is_err() {
+            // If the column does not exist in the (un-projected) table schema 
then
+            // it must be a projected column.
+            self.projected_columns = true;
+            return Some(TreeNodeRecursion::Jump);
+        }
+
+        None
+    }
+
+    #[inline]
+    fn prevents_pushdown(&self) -> bool {
+        self.non_primitive_columns || self.projected_columns
+    }
 }
 
-/// Implement the `TreeNodeRewriter` trait for `FilterCandidateBuilder` that
-/// walks the expression tree and rewrites it in preparation of becoming
-/// `FilterCandidate`.
-impl<'a> TreeNodeRewriter for FilterCandidateBuilder<'a> {
+impl<'schema> TreeNodeRewriter for PushdownChecker<'schema> {
     type Node = Arc<dyn PhysicalExpr>;
 
-    /// Called before visiting each child
     fn f_down(
         &mut self,
         node: Arc<dyn PhysicalExpr>,
     ) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
         if let Some(column) = node.as_any().downcast_ref::<Column>() {
-            if let Ok(idx) = self.file_schema.index_of(column.name()) {
-                self.required_column_indices.insert(idx);
-
-                if 
DataType::is_nested(self.file_schema.field(idx).data_type()) {
-                    self.non_primitive_columns = true;
-                    return Ok(Transformed::new(node, false, 
TreeNodeRecursion::Jump));
-                }
-            } else if self.table_schema.index_of(column.name()).is_err() {
-                // If the column does not exist in the (un-projected) table 
schema then
-                // it must be a projected column.
-                self.projected_columns = true;
-                return Ok(Transformed::new(node, false, 
TreeNodeRecursion::Jump));
+            if let Some(recursion) = self.check_single_column(column.name()) {
+                return Ok(Transformed::new(node, false, recursion));
             }
         }
 
@@ -322,29 +347,30 @@ impl<'a> TreeNodeRewriter for FilterCandidateBuilder<'a> {
     }
 
     /// After visiting all children, rewrite column references to nulls if
-    /// they are not in the file schema
+    /// they are not in the file schema.
+    /// We do this because they won't be relevant if they're not in the file 
schema, since that's
+    /// the only thing we're dealing with here as this is only used for the 
parquet pushdown during
+    /// scanning
     fn f_up(
         &mut self,
         expr: Arc<dyn PhysicalExpr>,
     ) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
-        // if the expression is a column, is it in the file schema?
         if let Some(column) = expr.as_any().downcast_ref::<Column>() {
+            // if the expression is a column, is it in the file schema?
             if self.file_schema.field_with_name(column.name()).is_err() {
-                // Replace the column reference with a NULL (using the type 
from the table schema)
-                // e.g. `column = 'foo'` is rewritten be transformed to `NULL 
= 'foo'`
-                //
-                // See comments on `FilterCandidateBuilder` for more 
information
-                return match self.table_schema.field_with_name(column.name()) {
-                    Ok(field) => {
-                        // return the null value corresponding to the data type
+                return self
+                    .table_schema
+                    .field_with_name(column.name())
+                    .and_then(|field| {
+                        // Replace the column reference with a NULL (using the 
type from the table schema)
+                        // e.g. `column = 'foo'` is rewritten be transformed 
to `NULL = 'foo'`
+                        //
+                        // See comments on `FilterCandidateBuilder` for more 
information
                         let null_value = 
ScalarValue::try_from(field.data_type())?;
-                        
Ok(Transformed::yes(Arc::new(Literal::new(null_value))))
-                    }
-                    Err(e) => {
-                        // If the column is not in the table schema, should 
throw the error
-                        arrow_err!(e)
-                    }
-                };
+                        Ok(Transformed::yes(Arc::new(Literal::new(null_value)) 
as _))
+                    })
+                    // If the column is not in the table schema, should throw 
the error
+                    .map_err(|e| arrow_datafusion_err!(e));
             }
         }
 
@@ -352,6 +378,69 @@ impl<'a> TreeNodeRewriter for FilterCandidateBuilder<'a> {
     }
 }
 
+type ProjectionAndExpr = (BTreeSet<usize>, Arc<dyn PhysicalExpr>);
+
+// Checks if a given expression can be pushed down into `ParquetExec` as 
opposed to being evaluated
+// post-parquet-scan in a `FilterExec`. If it can be pushed down, this returns 
returns all the
+// columns in the given expression so that they can be used in the parquet 
scanning, along with the
+// expression rewritten as defined in [`PushdownChecker::f_up`]
+fn pushdown_columns(
+    expr: Arc<dyn PhysicalExpr>,
+    file_schema: &Schema,
+    table_schema: &Schema,
+) -> Result<Option<ProjectionAndExpr>> {
+    let mut checker = PushdownChecker::new(file_schema, table_schema);
+
+    let expr = expr.rewrite(&mut checker).data()?;
+
+    
Ok((!checker.prevents_pushdown()).then_some((checker.required_column_indices, 
expr)))
+}
+
+/// creates a PushdownChecker for a single use to check a given column with 
the given schemes. Used
+/// to check preemptively if a column name would prevent pushdowning.
+/// effectively does the inverse of [`pushdown_columns`] does, but with a 
single given column
+/// (instead of traversing the entire tree to determine this)
+fn would_column_prevent_pushdown(
+    column_name: &str,
+    file_schema: &Schema,
+    table_schema: &Schema,
+) -> bool {
+    let mut checker = PushdownChecker::new(file_schema, table_schema);
+
+    // the return of this is only used for [`PushdownChecker::f_down()`], so 
we can safely ignore
+    // it here. I'm just verifying we know the return type of this so nobody 
accidentally changes
+    // the return type of this fn and it gets implicitly ignored here.
+    let _: Option<TreeNodeRecursion> = 
checker.check_single_column(column_name);
+
+    // and then return a value based on the state of the checker
+    checker.prevents_pushdown()
+}
+
+/// Recurses through expr as a trea, finds all `column`s, and checks if any of 
them would prevent
+/// this expression from being predicate pushed down. If any of them would, 
this returns false.
+/// Otherwise, true.
+pub fn can_expr_be_pushed_down_with_schemas(
+    expr: &datafusion_expr::Expr,
+    file_schema: &Schema,
+    table_schema: &Schema,
+) -> bool {
+    let mut can_be_pushed = true;
+    expr.apply(|expr| match expr {
+        datafusion_expr::Expr::Column(column) => {
+            can_be_pushed &=
+                !would_column_prevent_pushdown(column.name(), file_schema, 
table_schema);
+            Ok(if can_be_pushed {
+                TreeNodeRecursion::Jump
+            } else {
+                TreeNodeRecursion::Stop
+            })
+        }
+        _ => Ok(TreeNodeRecursion::Continue),
+    })
+    .unwrap(); // we never return an Err, so we can safely unwrap this
+    can_be_pushed
+}
+
 /// Computes the projection required to go from the file's schema order to the 
projected
 /// order expected by this filter
 ///
@@ -444,11 +533,13 @@ pub fn build_row_filter(
     // Determine which conjuncts can be evaluated as ArrowPredicates, if any
     let mut candidates: Vec<FilterCandidate> = predicates
         .into_iter()
-        .flat_map(|expr| {
+        .map(|expr| {
             FilterCandidateBuilder::new(expr.clone(), file_schema, 
table_schema)
                 .build(metadata)
-                .unwrap_or_default()
         })
+        .collect::<Result<Vec<_>, _>>()?
+        .into_iter()
+        .flatten()
         .collect();
 
     // no candidates
@@ -485,11 +576,12 @@ pub fn build_row_filter(
 #[cfg(test)]
 mod test {
     use super::*;
-    use crate::datasource::schema_adapter::DefaultSchemaAdapterFactory;
-    use crate::datasource::schema_adapter::SchemaAdapterFactory;
+    use crate::datasource::schema_adapter::{
+        DefaultSchemaAdapterFactory, SchemaAdapterFactory,
+    };
 
     use arrow::datatypes::Field;
-    use arrow_schema::TimeUnit::Nanosecond;
+    use arrow_schema::{Fields, TimeUnit::Nanosecond};
     use datafusion_expr::{cast, col, lit, Expr};
     use datafusion_physical_expr::planner::logical2physical;
     use datafusion_physical_plan::metrics::{Count, Time};
@@ -583,8 +675,9 @@ mod test {
             false,
         )]);
 
+        let table_ref = Arc::new(table_schema.clone());
         let schema_adapter =
-            DefaultSchemaAdapterFactory 
{}.create(Arc::new(table_schema.clone()));
+            DefaultSchemaAdapterFactory.create(Arc::clone(&table_ref), 
table_ref);
         let (schema_mapping, _) = schema_adapter
             .map_schema(&file_schema)
             .expect("creating schema mapping");
@@ -661,4 +754,87 @@ mod test {
             assert_eq!(projection, remapped)
         }
     }
+
+    #[test]
+    fn nested_data_structures_prevent_pushdown() {
+        let table_schema = get_basic_table_schema();
+
+        let file_schema = Schema::new(vec![Field::new(
+            "list_col",
+            DataType::Struct(Fields::empty()),
+            true,
+        )]);
+
+        let expr = col("list_col").is_not_null();
+
+        assert!(!can_expr_be_pushed_down_with_schemas(
+            &expr,
+            &file_schema,
+            &table_schema
+        ));
+    }
+
+    #[test]
+    fn projected_columns_prevent_pushdown() {
+        let table_schema = get_basic_table_schema();
+
+        let file_schema =
+            Schema::new(vec![Field::new("existing_col", DataType::Int64, 
true)]);
+
+        let expr = col("nonexistent_column").is_null();
+
+        assert!(!can_expr_be_pushed_down_with_schemas(
+            &expr,
+            &file_schema,
+            &table_schema
+        ));
+    }
+
+    #[test]
+    fn basic_expr_doesnt_prevent_pushdown() {
+        let table_schema = get_basic_table_schema();
+
+        let file_schema = Schema::new(vec![Field::new("str_col", 
DataType::Utf8, true)]);
+
+        let expr = col("str_col").is_null();
+
+        assert!(can_expr_be_pushed_down_with_schemas(
+            &expr,
+            &file_schema,
+            &table_schema
+        ));
+    }
+
+    #[test]
+    fn complex_expr_doesnt_prevent_pushdown() {
+        let table_schema = get_basic_table_schema();
+
+        let file_schema = Schema::new(vec![
+            Field::new("str_col", DataType::Utf8, true),
+            Field::new("int_col", DataType::UInt64, true),
+        ]);
+
+        let expr = col("str_col")
+            .is_not_null()
+            
.or(col("int_col").gt(Expr::Literal(ScalarValue::UInt64(Some(5)))));
+
+        assert!(can_expr_be_pushed_down_with_schemas(
+            &expr,
+            &file_schema,
+            &table_schema
+        ));
+    }
+
+    fn get_basic_table_schema() -> Schema {
+        let testdata = crate::test_util::parquet_test_data();
+        let file = 
std::fs::File::open(format!("{testdata}/alltypes_plain.parquet"))
+            .expect("opening file");
+
+        let reader = SerializedFileReader::new(file).expect("creating reader");
+
+        let metadata = reader.metadata();
+
+        parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
+            .expect("parsing schema")
+    }
 }
diff --git a/datafusion/core/src/datasource/schema_adapter.rs 
b/datafusion/core/src/datasource/schema_adapter.rs
index de508f2c34..fdf3381758 100644
--- a/datafusion/core/src/datasource/schema_adapter.rs
+++ b/datafusion/core/src/datasource/schema_adapter.rs
@@ -35,7 +35,13 @@ use std::sync::Arc;
 /// other than null)
 pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static {
     /// Provides `SchemaAdapter`.
-    fn create(&self, schema: SchemaRef) -> Box<dyn SchemaAdapter>;
+    // The design of this function is mostly modeled for the needs of 
DefaultSchemaAdapterFactory,
+    // read its implementation docs for the reasoning
+    fn create(
+        &self,
+        projected_table_schema: SchemaRef,
+        table_schema: SchemaRef,
+    ) -> Box<dyn SchemaAdapter>;
 }
 
 /// Adapt file-level [`RecordBatch`]es to a table schema, which may have a 
schema
@@ -96,17 +102,33 @@ pub trait SchemaMapper: Debug + Send + Sync {
 /// Implementation of [`SchemaAdapterFactory`] that maps columns by name
 /// and casts columns to the expected type.
 #[derive(Clone, Debug, Default)]
-pub struct DefaultSchemaAdapterFactory {}
+pub struct DefaultSchemaAdapterFactory;
 
 impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
-    fn create(&self, table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
-        Box::new(DefaultSchemaAdapter { table_schema })
+    fn create(
+        &self,
+        projected_table_schema: SchemaRef,
+        table_schema: SchemaRef,
+    ) -> Box<dyn SchemaAdapter> {
+        Box::new(DefaultSchemaAdapter {
+            projected_table_schema,
+            table_schema,
+        })
     }
 }
 
+/// This SchemaAdapter requires both the table schema and the projected table 
schema because of the
+/// needs of the [`SchemaMapping`] it creates. Read its documentation for more 
details
 #[derive(Clone, Debug)]
 pub(crate) struct DefaultSchemaAdapter {
-    /// Schema for the table
+    /// The schema for the table, projected to include only the fields being 
output (projected) by the
+    /// associated ParquetExec
+    projected_table_schema: SchemaRef,
+    /// The entire table schema for the table we're using this to adapt.
+    ///
+    /// This is used to evaluate any filters pushed down into the scan
+    /// which may refer to columns that are not referred to anywhere
+    /// else in the plan.
     table_schema: SchemaRef,
 }
 
@@ -116,7 +138,7 @@ impl SchemaAdapter for DefaultSchemaAdapter {
     ///
     /// Panics if index is not in range for the table schema
     fn map_column_index(&self, index: usize, file_schema: &Schema) -> 
Option<usize> {
-        let field = self.table_schema.field(index);
+        let field = self.projected_table_schema.field(index);
         Some(file_schema.fields.find(field.name())?.0)
     }
 
@@ -133,11 +155,11 @@ impl SchemaAdapter for DefaultSchemaAdapter {
         file_schema: &Schema,
     ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
         let mut projection = Vec::with_capacity(file_schema.fields().len());
-        let mut field_mappings = vec![None; self.table_schema.fields().len()];
+        let mut field_mappings = vec![None; 
self.projected_table_schema.fields().len()];
 
         for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
             if let Some((table_idx, table_field)) =
-                self.table_schema.fields().find(file_field.name())
+                self.projected_table_schema.fields().find(file_field.name())
             {
                 match can_cast_types(file_field.data_type(), 
table_field.data_type()) {
                     true => {
@@ -158,8 +180,9 @@ impl SchemaAdapter for DefaultSchemaAdapter {
 
         Ok((
             Arc::new(SchemaMapping {
-                table_schema: self.table_schema.clone(),
+                projected_table_schema: self.projected_table_schema.clone(),
                 field_mappings,
+                table_schema: self.table_schema.clone(),
             }),
             projection,
         ))
@@ -168,39 +191,81 @@ impl SchemaAdapter for DefaultSchemaAdapter {
 
 /// The SchemaMapping struct holds a mapping from the file schema to the table 
schema
 /// and any necessary type conversions that need to be applied.
+///
+/// This needs both the projected table schema and full table schema because 
its different
+/// functions have different needs. The [`map_batch`] function is only used by 
the ParquetOpener to
+/// produce a RecordBatch which has the projected schema, since that's the 
schema which is supposed
+/// to come out of the execution of this query. [`map_partial_batch`], 
however, is used to create a
+/// RecordBatch with a schema that can be used for Parquet pushdown, meaning 
that it may contain
+/// fields which are not in the projected schema (as the fields that parquet 
pushdown filters
+/// operate can be completely distinct from the fields that are projected 
(output) out of the
+/// ParquetExec).
+///
+/// [`map_partial_batch`] uses `table_schema` to create the resulting 
RecordBatch (as it could be
+/// operating on any fields in the schema), while [`map_batch`] uses 
`projected_table_schema` (as
+/// it can only operate on the projected fields).
+///
+/// [`map_batch`]: Self::map_batch
+/// [`map_partial_batch`]: Self::map_partial_batch
 #[derive(Debug)]
 pub struct SchemaMapping {
-    /// The schema of the table. This is the expected schema after conversion 
and it should match the schema of the query result.
-    table_schema: SchemaRef,
-    /// Mapping from field index in `table_schema` to index in projected 
file_schema
+    /// The schema of the table. This is the expected schema after conversion 
and it should match
+    /// the schema of the query result.
+    projected_table_schema: SchemaRef,
+    /// Mapping from field index in `projected_table_schema` to index in 
projected file_schema.
+    /// They are Options instead of just plain `usize`s because the table 
could have fields that
+    /// don't exist in the file.
     field_mappings: Vec<Option<usize>>,
+    /// The entire table schema, as opposed to the projected_table_schema 
(which only contains the
+    /// columns that we are projecting out of this query). This contains all 
fields in the table,
+    /// regardless of if they will be projected out or not.
+    table_schema: SchemaRef,
 }
 
 impl SchemaMapper for SchemaMapping {
-    /// Adapts a `RecordBatch` to match the `table_schema` using the stored 
mapping and conversions.
+    /// Adapts a `RecordBatch` to match the `projected_table_schema` using the 
stored mapping and
+    /// conversions. The produced RecordBatch has a schema that contains only 
the projected
+    /// columns, so if one needs a RecordBatch with a schema that references 
columns which are not
+    /// in the projected, it would be better to use `map_partial_batch`
     fn map_batch(&self, batch: RecordBatch) -> 
datafusion_common::Result<RecordBatch> {
         let batch_rows = batch.num_rows();
         let batch_cols = batch.columns().to_vec();
 
         let cols = self
-            .table_schema
+            .projected_table_schema
+            // go through each field in the projected schema
             .fields()
             .iter()
+            // and zip it with the index that maps fields from the projected 
table schema to the
+            // projected file schema in `batch`
             .zip(&self.field_mappings)
-            .map(|(field, file_idx)| match file_idx {
-                Some(batch_idx) => cast(&batch_cols[*batch_idx], 
field.data_type()),
-                None => Ok(new_null_array(field.data_type(), batch_rows)),
+            // and for each one...
+            .map(|(field, file_idx)| {
+                file_idx.map_or_else(
+                    // If this field only exists in the table, and not in the 
file, then we know
+                    // that it's null, so just return that.
+                    || Ok(new_null_array(field.data_type(), batch_rows)),
+                    // However, if it does exist in both, then try to cast it 
to the correct output
+                    // type
+                    |batch_idx| cast(&batch_cols[batch_idx], 
field.data_type()),
+                )
             })
             .collect::<datafusion_common::Result<Vec<_>, _>>()?;
 
         // Necessary to handle empty batches
         let options = 
RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
 
-        let schema = self.table_schema.clone();
+        let schema = self.projected_table_schema.clone();
         let record_batch = RecordBatch::try_new_with_options(schema, cols, 
&options)?;
         Ok(record_batch)
     }
 
+    /// Adapts a [`RecordBatch`]'s schema into one that has all the correct 
output types and only
+    /// contains the fields that exist in both the file schema and table 
schema.
+    ///
+    /// Unlike `map_batch` this method also preserves the columns that
+    /// may not appear in the final output (`projected_table_schema`) but may
+    /// appear in push down predicates
     fn map_partial_batch(
         &self,
         batch: RecordBatch,
@@ -208,15 +273,33 @@ impl SchemaMapper for SchemaMapping {
         let batch_cols = batch.columns().to_vec();
         let schema = batch.schema();
 
-        let mut cols = vec![];
-        let mut fields = vec![];
-        for (i, f) in schema.fields().iter().enumerate() {
-            let table_field = self.table_schema.field_with_name(f.name());
-            if let Ok(tf) = table_field {
-                cols.push(cast(&batch_cols[i], tf.data_type())?);
-                fields.push(tf.clone());
-            }
-        }
+        // for each field in the batch's schema (which is based on a file, not 
a table)...
+        let (cols, fields) = schema
+            .fields()
+            .iter()
+            .zip(batch_cols.iter())
+            .flat_map(|(field, batch_col)| {
+                self.table_schema
+                    // try to get the same field from the table schema that we 
have stored in self
+                    .field_with_name(field.name())
+                    // and if we don't have it, that's fine, ignore it. This 
may occur when we've
+                    // created an external table whose fields are a subset of 
the fields in this
+                    // file, then tried to read data from the file into this 
table. If that is the
+                    // case here, it's fine to ignore because we don't care 
about this field
+                    // anyways
+                    .ok()
+                    // but if we do have it,
+                    .map(|table_field| {
+                        // try to cast it into the correct output type. we 
don't want to ignore this
+                        // error, though, so it's propagated.
+                        cast(batch_col, table_field.data_type())
+                            // and if that works, return the field and column.
+                            .map(|new_col| (new_col, table_field.clone()))
+                    })
+            })
+            .collect::<Result<Vec<_>, _>>()?
+            .into_iter()
+            .unzip::<_, _, Vec<_>, Vec<_>>();
 
         // Necessary to handle empty batches
         let options = 
RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
@@ -322,12 +405,16 @@ mod tests {
     }
 
     #[derive(Debug)]
-    struct TestSchemaAdapterFactory {}
+    struct TestSchemaAdapterFactory;
 
     impl SchemaAdapterFactory for TestSchemaAdapterFactory {
-        fn create(&self, schema: SchemaRef) -> Box<dyn SchemaAdapter> {
+        fn create(
+            &self,
+            projected_table_schema: SchemaRef,
+            _table_schema: SchemaRef,
+        ) -> Box<dyn SchemaAdapter> {
             Box::new(TestSchemaAdapter {
-                table_schema: schema,
+                table_schema: projected_table_schema,
             })
         }
     }
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index cc35255dfe..2010a5c664 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -429,7 +429,7 @@ impl DefaultPhysicalPlanner {
         Ok(Some(plan))
     }
 
-    /// Given a single LogicalPlan node, map it to it's physical ExecutionPlan 
counterpart.
+    /// Given a single LogicalPlan node, map it to its physical ExecutionPlan 
counterpart.
     async fn map_logical_node_to_physical(
         &self,
         node: &LogicalPlan,
diff --git a/datafusion/expr-common/src/signature.rs 
b/datafusion/expr-common/src/signature.rs
index ffa5f17cec..d1553b3315 100644
--- a/datafusion/expr-common/src/signature.rs
+++ b/datafusion/expr-common/src/signature.rs
@@ -43,8 +43,8 @@ pub enum Volatility {
     Immutable,
     /// A stable function may return different values given the same input 
across different
     /// queries but must return the same value for a given input within a 
query. An example of
-    /// this is the `Now` function. DataFusion
-    /// will attempt to inline `Stable` functions during planning, when 
possible.
+    /// this is the `Now` function. DataFusion will attempt to inline `Stable` 
functions
+    /// during planning, when possible.
     /// For query `select col1, now() from t1`, it might take a while to 
execute but
     /// `now()` column will be the same for each output row, which is evaluated
     /// during planning.
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index 1c94c7f3af..b3f9b26fa4 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -1980,7 +1980,7 @@ impl LogicalPlan {
                             .map(|i| &input_columns[*i])
                             .collect::<Vec<&Column>>();
                         // get items from input_columns indexed by 
list_col_indices
-                        write!(f, "Unnest: lists[{}] structs[{}]", 
+                        write!(f, "Unnest: lists[{}] structs[{}]",
                         expr_vec_fmt!(list_type_columns),
                         expr_vec_fmt!(struct_type_columns))
                     }
@@ -2124,11 +2124,13 @@ impl Projection {
 /// the `Result` will contain the schema; otherwise, it will contain an error.
 pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> 
Result<Arc<DFSchema>> {
     let metadata = input.schema().metadata().clone();
-    let mut schema =
-        DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?, 
metadata)?;
-    schema = 
schema.with_functional_dependencies(calc_func_dependencies_for_project(
-        exprs, input,
-    )?)?;
+
+    let schema =
+        DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?, 
metadata)?
+            .with_functional_dependencies(calc_func_dependencies_for_project(
+                exprs, input,
+            )?)?;
+
     Ok(Arc::new(schema))
 }
 
diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs 
b/datafusion/optimizer/src/optimize_projections/mod.rs
index 0623be504b..65db164c6e 100644
--- a/datafusion/optimizer/src/optimize_projections/mod.rs
+++ b/datafusion/optimizer/src/optimize_projections/mod.rs
@@ -176,7 +176,7 @@ fn optimize_projections(
             let all_exprs_iter = 
new_group_bys.iter().chain(new_aggr_expr.iter());
             let schema = aggregate.input.schema();
             let necessary_indices =
-                RequiredIndicies::new().with_exprs(schema, all_exprs_iter)?;
+                RequiredIndicies::new().with_exprs(schema, all_exprs_iter);
             let necessary_exprs = necessary_indices.get_required_exprs(schema);
 
             return optimize_projections(
@@ -216,8 +216,7 @@ fn optimize_projections(
 
             // Get all the required column indices at the input, either by the
             // parent or window expression requirements.
-            let required_indices =
-                child_reqs.with_exprs(&input_schema, &new_window_expr)?;
+            let required_indices = child_reqs.with_exprs(&input_schema, 
&new_window_expr);
 
             return optimize_projections(
                 Arc::unwrap_or_clone(window.input),
@@ -269,7 +268,6 @@ fn optimize_projections(
             .map(LogicalPlan::TableScan)
             .map(Transformed::yes);
         }
-
         // Other node types are handled below
         _ => {}
     };
@@ -761,7 +759,7 @@ fn rewrite_projection_given_requirements(
     let exprs_used = indices.get_at_indices(&expr);
 
     let required_indices =
-        RequiredIndicies::new().with_exprs(input.schema(), exprs_used.iter())?;
+        RequiredIndicies::new().with_exprs(input.schema(), exprs_used.iter());
 
     // rewrite the children projection, and if they are changed rewrite the
     // projection down
@@ -781,8 +779,8 @@ fn rewrite_projection_given_requirements(
 /// - input schema of the projection, output schema of the projection are 
same, and
 /// - all projection expressions are either Column or Literal
 fn is_projection_unnecessary(input: &LogicalPlan, proj_exprs: &[Expr]) -> 
Result<bool> {
-    Ok(&projection_schema(input, proj_exprs)? == input.schema()
-        && proj_exprs.iter().all(is_expr_trivial))
+    let proj_schema = projection_schema(input, proj_exprs)?;
+    Ok(&proj_schema == input.schema() && 
proj_exprs.iter().all(is_expr_trivial))
 }
 
 #[cfg(test)]
diff --git a/datafusion/optimizer/src/optimize_projections/required_indices.rs 
b/datafusion/optimizer/src/optimize_projections/required_indices.rs
index a9a18898c8..60d8ef1a8e 100644
--- a/datafusion/optimizer/src/optimize_projections/required_indices.rs
+++ b/datafusion/optimizer/src/optimize_projections/required_indices.rs
@@ -96,7 +96,7 @@ impl RequiredIndicies {
         // Add indices of the child fields referred to by the expressions in 
the
         // parent
         plan.apply_expressions(|e| {
-            self.add_expr(schema, e)?;
+            self.add_expr(schema, e);
             Ok(TreeNodeRecursion::Continue)
         })?;
         Ok(self.compact())
@@ -111,7 +111,7 @@ impl RequiredIndicies {
     ///
     /// * `input_schema`: The input schema to analyze for index requirements.
     /// * `expr`: An expression for which we want to find necessary field 
indices.
-    fn add_expr(&mut self, input_schema: &DFSchemaRef, expr: &Expr) -> 
Result<()> {
+    fn add_expr(&mut self, input_schema: &DFSchemaRef, expr: &Expr) {
         // TODO could remove these clones (and visit the expression directly)
         let mut cols = expr.column_refs();
         // Get outer-referenced (subquery) columns:
@@ -122,7 +122,6 @@ impl RequiredIndicies {
                 self.indices.push(idx);
             }
         }
-        Ok(())
     }
 
     /// Adds the indices of the fields referred to by the given expressions
@@ -136,14 +135,14 @@ impl RequiredIndicies {
         self,
         schema: &DFSchemaRef,
         exprs: impl IntoIterator<Item = &'a Expr>,
-    ) -> Result<Self> {
+    ) -> Self {
         exprs
             .into_iter()
-            .try_fold(self, |mut acc, expr| {
-                acc.add_expr(schema, expr)?;
-                Ok(acc)
+            .fold(self, |mut acc, expr| {
+                acc.add_expr(schema, expr);
+                acc
             })
-            .map(|acc| acc.compact())
+            .compact()
     }
 
     /// Adds all `indices` into this instance.
diff --git a/datafusion/physical-expr/src/expressions/binary.rs 
b/datafusion/physical-expr/src/expressions/binary.rs
index e115ec3c74..236b24dd40 100644
--- a/datafusion/physical-expr/src/expressions/binary.rs
+++ b/datafusion/physical-expr/src/expressions/binary.rs
@@ -213,8 +213,8 @@ macro_rules! compute_utf8_flag_op_scalar {
             .downcast_ref::<$ARRAYTYPE>()
             .expect("compute_utf8_flag_op_scalar failed to downcast array");
 
-        if let 
ScalarValue::Utf8(Some(string_value))|ScalarValue::LargeUtf8(Some(string_value))
 = $RIGHT {
-            let flag = if $FLAG { Some("i") } else { None };
+        if let ScalarValue::Utf8(Some(string_value)) | 
ScalarValue::LargeUtf8(Some(string_value)) = $RIGHT {
+            let flag = $FLAG.then_some("i");
             let mut array =
                 paste::expr! {[<$OP _utf8_scalar>]}(&ll, &string_value, flag)?;
             if $NOT {
diff --git a/datafusion/physical-expr/src/expressions/column.rs 
b/datafusion/physical-expr/src/expressions/column.rs
index bf15821bca..4aad959584 100644
--- a/datafusion/physical-expr/src/expressions/column.rs
+++ b/datafusion/physical-expr/src/expressions/column.rs
@@ -163,7 +163,10 @@ impl Column {
             internal_err!(
                 "PhysicalExpr Column references column '{}' at index {} 
(zero-based) but input schema only has {} columns: {:?}",
                 self.name,
-                self.index, input_schema.fields.len(), 
input_schema.fields().iter().map(|f| f.name().clone()).collect::<Vec<String>>())
+                self.index,
+                input_schema.fields.len(),
+                input_schema.fields().iter().map(|f| 
f.name()).collect::<Vec<_>>()
+            )
         }
     }
 }
diff --git a/datafusion/proto/src/logical_plan/mod.rs 
b/datafusion/proto/src/logical_plan/mod.rs
index bf5394ec01..db94563b7a 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -362,13 +362,18 @@ impl AsLogicalPlan for LogicalPlanNode {
                             "logical_plan::from_proto() Unsupported file 
format '{self:?}'"
                         ))
                     })? {
-                        #[cfg(feature = "parquet")]
+                        #[cfg_attr(not(feature = "parquet"), 
allow(unused_variables))]
                         FileFormatType::Parquet(protobuf::ParquetFormat 
{options}) => {
-                            let mut parquet = ParquetFormat::default();
-                            if let Some(options) = options {
-                                parquet = 
parquet.with_options(options.try_into()?)
+                            #[cfg(feature = "parquet")]
+                            {
+                                let mut parquet = ParquetFormat::default();
+                                if let Some(options) = options {
+                                    parquet = 
parquet.with_options(options.try_into()?)
+                                }
+                                Arc::new(parquet)
                             }
-                            Arc::new(parquet)
+                            #[cfg(not(feature = "parquet"))]
+                            panic!("Unable to process parquet file since 
`parquet` feature is not enabled");
                         }
                         FileFormatType::Csv(protobuf::CsvFormat {
                             options
diff --git a/datafusion/proto/src/physical_plan/mod.rs 
b/datafusion/proto/src/physical_plan/mod.rs
index e1cc37091b..6abfc71288 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -234,30 +234,35 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
                 .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
                 .build(),
             )),
-            #[cfg(feature = "parquet")]
+            #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
             PhysicalPlanType::ParquetScan(scan) => {
-                let base_config = parse_protobuf_file_scan_config(
-                    scan.base_conf.as_ref().unwrap(),
-                    registry,
-                    extension_codec,
-                )?;
-                let predicate = scan
-                    .predicate
-                    .as_ref()
-                    .map(|expr| {
-                        parse_physical_expr(
-                            expr,
-                            registry,
-                            base_config.file_schema.as_ref(),
-                            extension_codec,
-                        )
-                    })
-                    .transpose()?;
-                let mut builder = ParquetExec::builder(base_config);
-                if let Some(predicate) = predicate {
-                    builder = builder.with_predicate(predicate)
+                #[cfg(feature = "parquet")]
+                {
+                    let base_config = parse_protobuf_file_scan_config(
+                        scan.base_conf.as_ref().unwrap(),
+                        registry,
+                        extension_codec,
+                    )?;
+                    let predicate = scan
+                        .predicate
+                        .as_ref()
+                        .map(|expr| {
+                            parse_physical_expr(
+                                expr,
+                                registry,
+                                base_config.file_schema.as_ref(),
+                                extension_codec,
+                            )
+                        })
+                        .transpose()?;
+                    let mut builder = ParquetExec::builder(base_config);
+                    if let Some(predicate) = predicate {
+                        builder = builder.with_predicate(predicate)
+                    }
+                    Ok(builder.build_arc())
                 }
-                Ok(builder.build_arc())
+                #[cfg(not(feature = "parquet"))]
+                panic!("Unable to process a Parquet PhysicalPlan when 
`parquet` feature is not enabled")
             }
             PhysicalPlanType::AvroScan(scan) => {
                 Ok(Arc::new(AvroExec::new(parse_protobuf_file_scan_config(
@@ -1068,35 +1073,45 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
                     sort_order,
                 )))
             }
+            #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
             PhysicalPlanType::ParquetSink(sink) => {
-                let input =
-                    into_physical_plan(&sink.input, registry, runtime, 
extension_codec)?;
-
-                let data_sink: ParquetSink = sink
-                    .sink
-                    .as_ref()
-                    .ok_or_else(|| proto_error("Missing required field in 
protobuf"))?
-                    .try_into()?;
-                let sink_schema = input.schema();
-                let sort_order = sink
-                    .sort_order
-                    .as_ref()
-                    .map(|collection| {
-                        parse_physical_sort_exprs(
-                            &collection.physical_sort_expr_nodes,
-                            registry,
-                            &sink_schema,
-                            extension_codec,
-                        )
-                        .map(|item| 
PhysicalSortRequirement::from_sort_exprs(&item))
-                    })
-                    .transpose()?;
-                Ok(Arc::new(DataSinkExec::new(
-                    input,
-                    Arc::new(data_sink),
-                    sink_schema,
-                    sort_order,
-                )))
+                #[cfg(feature = "parquet")]
+                {
+                    let input = into_physical_plan(
+                        &sink.input,
+                        registry,
+                        runtime,
+                        extension_codec,
+                    )?;
+
+                    let data_sink: ParquetSink = sink
+                        .sink
+                        .as_ref()
+                        .ok_or_else(|| proto_error("Missing required field in 
protobuf"))?
+                        .try_into()?;
+                    let sink_schema = input.schema();
+                    let sort_order = sink
+                        .sort_order
+                        .as_ref()
+                        .map(|collection| {
+                            parse_physical_sort_exprs(
+                                &collection.physical_sort_expr_nodes,
+                                registry,
+                                &sink_schema,
+                                extension_codec,
+                            )
+                            .map(|item| 
PhysicalSortRequirement::from_sort_exprs(&item))
+                        })
+                        .transpose()?;
+                    Ok(Arc::new(DataSinkExec::new(
+                        input,
+                        Arc::new(data_sink),
+                        sink_schema,
+                        sort_order,
+                    )))
+                }
+                #[cfg(not(feature = "parquet"))]
+                panic!("Trying to use ParquetSink without `parquet` feature 
enabled");
             }
             PhysicalPlanType::Unnest(unnest) => {
                 let input = into_physical_plan(
@@ -1954,6 +1969,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
                 });
             }
 
+            #[cfg(feature = "parquet")]
             if let Some(sink) = 
exec.sink().as_any().downcast_ref::<ParquetSink>() {
                 return Ok(protobuf::PhysicalPlanNode {
                     physical_plan_type: 
Some(PhysicalPlanType::ParquetSink(Box::new(
diff --git 
a/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt.temp 
b/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt.temp
new file mode 100644
index 0000000000..00e74a207b
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt.temp
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+##########
+## Join Tests
+##########
+
+# turn off repartition_joins
+statement ok
+set datafusion.optimizer.repartition_joins = false;
+
+include ./join.slt
diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt 
b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
index d69662f75d..24ffb963bb 100644
--- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
+++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
@@ -81,21 +81,15 @@ EXPLAIN select a from t_pushdown where b > 2 ORDER BY a;
 ----
 logical_plan
 01)Sort: t_pushdown.a ASC NULLS LAST
-02)--Projection: t_pushdown.a
-03)----Filter: t_pushdown.b > Int32(2)
-04)------TableScan: t_pushdown projection=[a, b], 
partial_filters=[t_pushdown.b > Int32(2)]
+02)--TableScan: t_pushdown projection=[a], full_filters=[t_pushdown.b > 
Int32(2)]
 physical_plan
 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST]
 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
-03)----CoalesceBatchesExec: target_batch_size=8192
-04)------FilterExec: b@1 > 2, projection=[a@0]
-05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
-06)----------ParquetExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
 projection=[a, b], predicate=b@1 > 2, pruning_predicate=CASE WHEN 
b_null_count@1 = b_row_count@2 THEN false ELSE b_max@0 > 2 END, 
required_guarantees=[]
+03)----ParquetExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
 projection=[a], predicate=b@1 > 2, pruning_predicate=CASE WHEN b_null_count@1 
= b_row_count@2 THEN false ELSE b_max@0 > 2 END, required_guarantees=[]
 
 
 # When filter pushdown *is* enabled, ParquetExec can filter exactly,
 # not just metadata, so we expect to see no FilterExec
-# once https://github.com/apache/datafusion/issues/4028 is fixed
 query T
 select a from t_pushdown where b > 2 ORDER BY a;
 ----
@@ -133,16 +127,11 @@ EXPLAIN select a from t_pushdown where b > 2 AND a IS NOT 
NULL order by a;
 ----
 logical_plan
 01)Sort: t_pushdown.a ASC NULLS LAST
-02)--Projection: t_pushdown.a
-03)----Filter: t_pushdown.b > Int32(2) AND t_pushdown.a IS NOT NULL
-04)------TableScan: t_pushdown projection=[a, b], 
partial_filters=[t_pushdown.b > Int32(2), t_pushdown.a IS NOT NULL]
+02)--TableScan: t_pushdown projection=[a], full_filters=[t_pushdown.b > 
Int32(2), t_pushdown.a IS NOT NULL]
 physical_plan
 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST]
 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
-03)----CoalesceBatchesExec: target_batch_size=8192
-04)------FilterExec: b@1 > 2 AND a@0 IS NOT NULL, projection=[a@0]
-05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
-06)----------ParquetExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
 projection=[a, b], predicate=b@1 > 2 AND a@0 IS NOT NULL, 
pruning_predicate=CASE WHEN b_null_count@1 = b_row_count@2 THEN false ELSE 
b_max@0 > 2 END AND a_null_count@4 != a_row_count@3, required_guarantees=[]
+03)----ParquetExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
 projection=[a], predicate=b@1 > 2 AND a@0 IS NOT NULL, pruning_predicate=CASE 
WHEN b_null_count@1 = b_row_count@2 THEN false ELSE b_max@0 > 2 END AND 
a_null_count@4 != a_row_count@3, required_guarantees=[]
 
 
 query I
@@ -155,16 +144,11 @@ EXPLAIN select b from t_pushdown where a = 'bar' order by 
b;
 ----
 logical_plan
 01)Sort: t_pushdown.b ASC NULLS LAST
-02)--Projection: t_pushdown.b
-03)----Filter: t_pushdown.a = Utf8("bar")
-04)------TableScan: t_pushdown projection=[a, b], 
partial_filters=[t_pushdown.a = Utf8("bar")]
+02)--TableScan: t_pushdown projection=[b], full_filters=[t_pushdown.a = 
Utf8("bar")]
 physical_plan
 01)SortPreservingMergeExec: [b@0 ASC NULLS LAST]
 02)--SortExec: expr=[b@0 ASC NULLS LAST], preserve_partitioning=[true]
-03)----CoalesceBatchesExec: target_batch_size=8192
-04)------FilterExec: a@0 = bar, projection=[b@1]
-05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
-06)----------ParquetExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
 projection=[a, b], predicate=a@0 = bar, pruning_predicate=CASE WHEN 
a_null_count@2 = a_row_count@3 THEN false ELSE a_min@0 <= bar AND bar <= 
a_max@1 END, required_guarantees=[a in (bar)]
+03)----ParquetExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
 projection=[b], predicate=a@0 = bar, pruning_predicate=CASE WHEN 
a_null_count@2 = a_row_count@3 THEN false ELSE a_min@0 <= bar AND bar <= 
a_max@1 END, required_guarantees=[a in (bar)]
 
 ## cleanup
 statement ok
diff --git a/datafusion/sqllogictest/test_files/repartition.slt 
b/datafusion/sqllogictest/test_files/repartition.slt
index 2d59ad2b5b..630674bb09 100644
--- a/datafusion/sqllogictest/test_files/repartition.slt
+++ b/datafusion/sqllogictest/test_files/repartition.slt
@@ -146,4 +146,4 @@ FROM t1 WHERE ((false > (v1 = v1)) IS DISTINCT FROM true);
 statement ok
 DROP TABLE t1;
 
-# End repartition on empty columns test
\ No newline at end of file
+# End repartition on empty columns test
diff --git a/datafusion/sqllogictest/test_files/select.slt 
b/datafusion/sqllogictest/test_files/select.slt
index bdd8deff18..05de3e0b80 100644
--- a/datafusion/sqllogictest/test_files/select.slt
+++ b/datafusion/sqllogictest/test_files/select.slt
@@ -1755,4 +1755,4 @@ SELECT "test.a" FROM (SELECT a AS "test.a" FROM test)
 1
 
 statement ok
-DROP TABLE test;
\ No newline at end of file
+DROP TABLE test;
diff --git a/datafusion/sqllogictest/test_files/string_view.slt 
b/datafusion/sqllogictest/test_files/string_view.slt
index 7df43bb7ed..c3b5fa8fc4 100644
--- a/datafusion/sqllogictest/test_files/string_view.slt
+++ b/datafusion/sqllogictest/test_files/string_view.slt
@@ -1638,4 +1638,3 @@ select column2|| ' ' ||column3 from temp;
 ----
 rust fast
 datafusion cool
-
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index 505c66aef0..1f90b94aee 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -4872,4 +4872,4 @@ query error DataFusion error: Execution error: Expected a 
signed integer literal
 SELECT NTH_VALUE('+Inf'::Double, v1) OVER (PARTITION BY v1) FROM t1;
 
 statement ok
-DROP TABLE t1;
\ No newline at end of file
+DROP TABLE t1;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to