This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new ca55f1ca9a Revert use file schema in parquet pruning (#16086)
ca55f1ca9a is described below

commit ca55f1ca9a42b6c04c4d1e8217a1885001008346
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Wed May 21 05:52:29 2025 -0700

    Revert use file schema in parquet pruning (#16086)
    
    * wip
    
    * comment
    
    * Update datafusion/core/src/datasource/physical_plan/parquet.rs
    
    * remove prints
    
    * better test
    
    * fmt
---
 .../core/src/datasource/physical_plan/parquet.rs   | 103 ++++++++++++++++++++-
 datafusion/datasource-parquet/src/opener.rs        |  28 +++---
 datafusion/datasource-parquet/src/row_filter.rs    |   4 +-
 datafusion/datasource-parquet/src/source.rs        |   2 +-
 4 files changed, 120 insertions(+), 17 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs 
b/datafusion/core/src/datasource/physical_plan/parquet.rs
index 0da230682b..fb4eb13db1 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -39,7 +39,7 @@ mod tests {
     use crate::test::object_store::local_unpartitioned_file;
     use arrow::array::{
         ArrayRef, AsArray, Date64Array, Int32Array, Int64Array, Int8Array, 
StringArray,
-        StructArray,
+        StringViewArray, StructArray,
     };
     use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaBuilder};
     use arrow::record_batch::RecordBatch;
@@ -100,6 +100,7 @@ mod tests {
         predicate: Option<Expr>,
         pushdown_predicate: bool,
         page_index_predicate: bool,
+        bloom_filters: bool,
     }
 
     impl RoundTrip {
@@ -132,6 +133,11 @@ mod tests {
             self
         }
 
+        fn with_bloom_filters(mut self) -> Self {
+            self.bloom_filters = true;
+            self
+        }
+
         /// run the test, returning only the resulting RecordBatches
         async fn round_trip_to_batches(
             self,
@@ -156,10 +162,20 @@ mod tests {
                 source = source
                     .with_pushdown_filters(true)
                     .with_reorder_filters(true);
+            } else {
+                source = source.with_pushdown_filters(false);
             }
 
             if self.page_index_predicate {
                 source = source.with_enable_page_index(true);
+            } else {
+                source = source.with_enable_page_index(false);
+            }
+
+            if self.bloom_filters {
+                source = source.with_bloom_filter_on_read(true);
+            } else {
+                source = source.with_bloom_filter_on_read(false);
             }
 
             source.with_schema(Arc::clone(&file_schema))
@@ -817,7 +833,7 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn evolved_schema_filter() {
+    async fn evolved_schema_column_order_filter() {
         let c1: ArrayRef =
             Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
 
@@ -848,6 +864,88 @@ mod tests {
         assert_eq!(read.len(), 0);
     }
 
+    #[tokio::test]
+    async fn evolved_schema_column_type_filter_strings() {
+        // The table and filter have a common data type, but the file schema 
differs
+        let c1: ArrayRef =
+            Arc::new(StringViewArray::from(vec![Some("foo"), Some("bar")]));
+        let batch = create_batch(vec![("c1", c1.clone())]);
+
+        let schema = Arc::new(Schema::new(vec![Field::new("c1", 
DataType::Utf8, false)]));
+
+        // Predicate should prune all row groups
+        let filter = 
col("c1").eq(lit(ScalarValue::Utf8(Some("aaa".to_string()))));
+        let rt = RoundTrip::new()
+            .with_predicate(filter)
+            .with_schema(schema.clone())
+            .round_trip(vec![batch.clone()])
+            .await;
+        // There should be no predicate evaluation errors
+        let metrics = rt.parquet_exec.metrics().unwrap();
+        assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
+        assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 0);
+        assert_eq!(rt.batches.unwrap().len(), 0);
+
+        // Predicate should prune no row groups
+        let filter = 
col("c1").eq(lit(ScalarValue::Utf8(Some("foo".to_string()))));
+        let rt = RoundTrip::new()
+            .with_predicate(filter)
+            .with_schema(schema)
+            .round_trip(vec![batch])
+            .await;
+        // There should be no predicate evaluation errors
+        let metrics = rt.parquet_exec.metrics().unwrap();
+        assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
+        assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 0);
+        let read = rt
+            .batches
+            .unwrap()
+            .iter()
+            .map(|b| b.num_rows())
+            .sum::<usize>();
+        assert_eq!(read, 2, "Expected 2 rows to match the predicate");
+    }
+
+    #[tokio::test]
+    async fn evolved_schema_column_type_filter_ints() {
+        // The table and filter have a common data type, but the file schema 
differs
+        let c1: ArrayRef = Arc::new(Int8Array::from(vec![Some(1), Some(2)]));
+        let batch = create_batch(vec![("c1", c1.clone())]);
+
+        let schema =
+            Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt64, 
false)]));
+
+        // Predicate should prune all row groups
+        let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(5))));
+        let rt = RoundTrip::new()
+            .with_predicate(filter)
+            .with_schema(schema.clone())
+            .round_trip(vec![batch.clone()])
+            .await;
+        // There should be no predicate evaluation errors
+        let metrics = rt.parquet_exec.metrics().unwrap();
+        assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
+        assert_eq!(rt.batches.unwrap().len(), 0);
+
+        // Predicate should prune no row groups
+        let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(1))));
+        let rt = RoundTrip::new()
+            .with_predicate(filter)
+            .with_schema(schema)
+            .round_trip(vec![batch])
+            .await;
+        // There should be no predicate evaluation errors
+        let metrics = rt.parquet_exec.metrics().unwrap();
+        assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
+        let read = rt
+            .batches
+            .unwrap()
+            .iter()
+            .map(|b| b.num_rows())
+            .sum::<usize>();
+        assert_eq!(read, 2, "Expected 2 rows to match the predicate");
+    }
+
     #[tokio::test]
     async fn evolved_schema_disjoint_schema_filter() {
         let c1: ArrayRef =
@@ -1748,6 +1846,7 @@ mod tests {
         let rt = RoundTrip::new()
             .with_predicate(filter.clone())
             .with_pushdown_predicate()
+            .with_bloom_filters()
             .round_trip(vec![batch1])
             .await;
 
diff --git a/datafusion/datasource-parquet/src/opener.rs 
b/datafusion/datasource-parquet/src/opener.rs
index 376b403361..9e14425074 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -55,8 +55,9 @@ pub(super) struct ParquetOpener {
     pub limit: Option<usize>,
     /// Optional predicate to apply during the scan
     pub predicate: Option<Arc<dyn PhysicalExpr>>,
-    /// Schema of the output table
-    pub table_schema: SchemaRef,
+    /// Schema of the output table without partition columns.
+    /// This is the schema we coerce the physical file schema into.
+    pub logical_file_schema: SchemaRef,
     /// Optional hint for how large the initial request to read parquet 
metadata
     /// should be
     pub metadata_size_hint: Option<usize>,
@@ -104,13 +105,13 @@ impl FileOpener for ParquetOpener {
         let batch_size = self.batch_size;
 
         let projected_schema =
-            SchemaRef::from(self.table_schema.project(&self.projection)?);
+            
SchemaRef::from(self.logical_file_schema.project(&self.projection)?);
         let schema_adapter_factory = Arc::clone(&self.schema_adapter_factory);
         let schema_adapter = self
             .schema_adapter_factory
-            .create(projected_schema, Arc::clone(&self.table_schema));
+            .create(projected_schema, Arc::clone(&self.logical_file_schema));
         let predicate = self.predicate.clone();
-        let table_schema = Arc::clone(&self.table_schema);
+        let logical_file_schema = Arc::clone(&self.logical_file_schema);
         let reorder_predicates = self.reorder_filters;
         let pushdown_filters = self.pushdown_filters;
         let coerce_int96 = self.coerce_int96;
@@ -141,17 +142,20 @@ impl FileOpener for ParquetOpener {
                     .await?;
 
             // Note about schemas: we are actually dealing with **3 different 
schemas** here:
-            // - The table schema as defined by the TableProvider. This is 
what the user sees, what they get when they `SELECT * FROM table`, etc.
-            // - The "virtual" file schema: this is the table schema minus any 
hive partition columns and projections. This is what the file schema is coerced 
to.
+            // - The table schema as defined by the TableProvider.
+            //   This is what the user sees, what they get when they `SELECT * 
FROM table`, etc.
+            // - The logical file schema: this is the table schema minus any 
hive partition columns and projections.
+            //   This is what the physicalfile schema is coerced to.
             // - The physical file schema: this is the schema as defined by 
the parquet file. This is what the parquet file actually contains.
             let mut physical_file_schema = 
Arc::clone(reader_metadata.schema());
 
             // The schema loaded from the file may not be the same as the
             // desired schema (for example if we want to instruct the parquet
             // reader to read strings using Utf8View instead). Update if 
necessary
-            if let Some(merged) =
-                apply_file_schema_type_coercions(&table_schema, 
&physical_file_schema)
-            {
+            if let Some(merged) = apply_file_schema_type_coercions(
+                &logical_file_schema,
+                &physical_file_schema,
+            ) {
                 physical_file_schema = Arc::new(merged);
                 options = 
options.with_schema(Arc::clone(&physical_file_schema));
                 reader_metadata = ArrowReaderMetadata::try_new(
@@ -178,7 +182,7 @@ impl FileOpener for ParquetOpener {
             // Build predicates for this specific file
             let (pruning_predicate, page_pruning_predicate) = 
build_pruning_predicates(
                 predicate.as_ref(),
-                &physical_file_schema,
+                &logical_file_schema,
                 &predicate_creation_errors,
             );
 
@@ -215,7 +219,7 @@ impl FileOpener for ParquetOpener {
                 let row_filter = row_filter::build_row_filter(
                     &predicate,
                     &physical_file_schema,
-                    &table_schema,
+                    &logical_file_schema,
                     builder.metadata(),
                     reorder_predicates,
                     &file_metrics,
diff --git a/datafusion/datasource-parquet/src/row_filter.rs 
b/datafusion/datasource-parquet/src/row_filter.rs
index cd0cbf087f..cde9e56c92 100644
--- a/datafusion/datasource-parquet/src/row_filter.rs
+++ b/datafusion/datasource-parquet/src/row_filter.rs
@@ -426,7 +426,7 @@ fn columns_sorted(_columns: &[usize], _metadata: 
&ParquetMetaData) -> Result<boo
 pub fn build_row_filter(
     expr: &Arc<dyn PhysicalExpr>,
     physical_file_schema: &SchemaRef,
-    table_schema: &SchemaRef,
+    logical_file_schema: &SchemaRef,
     metadata: &ParquetMetaData,
     reorder_predicates: bool,
     file_metrics: &ParquetFileMetrics,
@@ -447,7 +447,7 @@ pub fn build_row_filter(
             FilterCandidateBuilder::new(
                 Arc::clone(expr),
                 Arc::clone(physical_file_schema),
-                Arc::clone(table_schema),
+                Arc::clone(logical_file_schema),
                 Arc::clone(schema_adapter_factory),
             )
             .build(metadata)
diff --git a/datafusion/datasource-parquet/src/source.rs 
b/datafusion/datasource-parquet/src/source.rs
index 13684db8ea..69347f440c 100644
--- a/datafusion/datasource-parquet/src/source.rs
+++ b/datafusion/datasource-parquet/src/source.rs
@@ -481,7 +481,7 @@ impl FileSource for ParquetSource {
                 .expect("Batch size must set before creating ParquetOpener"),
             limit: base_config.limit,
             predicate: self.predicate.clone(),
-            table_schema: Arc::clone(&base_config.file_schema),
+            logical_file_schema: Arc::clone(&base_config.file_schema),
             metadata_size_hint: self.metadata_size_hint,
             metrics: self.metrics().clone(),
             parquet_file_reader_factory,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to