This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ca55f1ca9a Revert use file schema in parquet pruning (#16086)
ca55f1ca9a is described below
commit ca55f1ca9a42b6c04c4d1e8217a1885001008346
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Wed May 21 05:52:29 2025 -0700
Revert use file schema in parquet pruning (#16086)
* wip
* comment
* Update datafusion/core/src/datasource/physical_plan/parquet.rs
* remove prints
* better test
* fmt
---
.../core/src/datasource/physical_plan/parquet.rs | 103 ++++++++++++++++++++-
datafusion/datasource-parquet/src/opener.rs | 28 +++---
datafusion/datasource-parquet/src/row_filter.rs | 4 +-
datafusion/datasource-parquet/src/source.rs | 2 +-
4 files changed, 120 insertions(+), 17 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs
b/datafusion/core/src/datasource/physical_plan/parquet.rs
index 0da230682b..fb4eb13db1 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -39,7 +39,7 @@ mod tests {
use crate::test::object_store::local_unpartitioned_file;
use arrow::array::{
ArrayRef, AsArray, Date64Array, Int32Array, Int64Array, Int8Array,
StringArray,
- StructArray,
+ StringViewArray, StructArray,
};
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaBuilder};
use arrow::record_batch::RecordBatch;
@@ -100,6 +100,7 @@ mod tests {
predicate: Option<Expr>,
pushdown_predicate: bool,
page_index_predicate: bool,
+ bloom_filters: bool,
}
impl RoundTrip {
@@ -132,6 +133,11 @@ mod tests {
self
}
+ fn with_bloom_filters(mut self) -> Self {
+ self.bloom_filters = true;
+ self
+ }
+
/// run the test, returning only the resulting RecordBatches
async fn round_trip_to_batches(
self,
@@ -156,10 +162,20 @@ mod tests {
source = source
.with_pushdown_filters(true)
.with_reorder_filters(true);
+ } else {
+ source = source.with_pushdown_filters(false);
}
if self.page_index_predicate {
source = source.with_enable_page_index(true);
+ } else {
+ source = source.with_enable_page_index(false);
+ }
+
+ if self.bloom_filters {
+ source = source.with_bloom_filter_on_read(true);
+ } else {
+ source = source.with_bloom_filter_on_read(false);
}
source.with_schema(Arc::clone(&file_schema))
@@ -817,7 +833,7 @@ mod tests {
}
#[tokio::test]
- async fn evolved_schema_filter() {
+ async fn evolved_schema_column_order_filter() {
let c1: ArrayRef =
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
@@ -848,6 +864,88 @@ mod tests {
assert_eq!(read.len(), 0);
}
+ #[tokio::test]
+ async fn evolved_schema_column_type_filter_strings() {
+ // The table and filter have a common data type, but the file schema
differs
+ let c1: ArrayRef =
+ Arc::new(StringViewArray::from(vec![Some("foo"), Some("bar")]));
+ let batch = create_batch(vec![("c1", c1.clone())]);
+
+ let schema = Arc::new(Schema::new(vec![Field::new("c1",
DataType::Utf8, false)]));
+
+ // Predicate should prune all row groups
+ let filter =
col("c1").eq(lit(ScalarValue::Utf8(Some("aaa".to_string()))));
+ let rt = RoundTrip::new()
+ .with_predicate(filter)
+ .with_schema(schema.clone())
+ .round_trip(vec![batch.clone()])
+ .await;
+ // There should be no predicate evaluation errors
+ let metrics = rt.parquet_exec.metrics().unwrap();
+ assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
+ assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 0);
+ assert_eq!(rt.batches.unwrap().len(), 0);
+
+ // Predicate should prune no row groups
+ let filter =
col("c1").eq(lit(ScalarValue::Utf8(Some("foo".to_string()))));
+ let rt = RoundTrip::new()
+ .with_predicate(filter)
+ .with_schema(schema)
+ .round_trip(vec![batch])
+ .await;
+ // There should be no predicate evaluation errors
+ let metrics = rt.parquet_exec.metrics().unwrap();
+ assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
+ assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 0);
+ let read = rt
+ .batches
+ .unwrap()
+ .iter()
+ .map(|b| b.num_rows())
+ .sum::<usize>();
+ assert_eq!(read, 2, "Expected 2 rows to match the predicate");
+ }
+
+ #[tokio::test]
+ async fn evolved_schema_column_type_filter_ints() {
+ // The table and filter have a common data type, but the file schema
differs
+ let c1: ArrayRef = Arc::new(Int8Array::from(vec![Some(1), Some(2)]));
+ let batch = create_batch(vec![("c1", c1.clone())]);
+
+ let schema =
+ Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt64,
false)]));
+
+ // Predicate should prune all row groups
+ let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(5))));
+ let rt = RoundTrip::new()
+ .with_predicate(filter)
+ .with_schema(schema.clone())
+ .round_trip(vec![batch.clone()])
+ .await;
+ // There should be no predicate evaluation errors
+ let metrics = rt.parquet_exec.metrics().unwrap();
+ assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
+ assert_eq!(rt.batches.unwrap().len(), 0);
+
+ // Predicate should prune no row groups
+ let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(1))));
+ let rt = RoundTrip::new()
+ .with_predicate(filter)
+ .with_schema(schema)
+ .round_trip(vec![batch])
+ .await;
+ // There should be no predicate evaluation errors
+ let metrics = rt.parquet_exec.metrics().unwrap();
+ assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
+ let read = rt
+ .batches
+ .unwrap()
+ .iter()
+ .map(|b| b.num_rows())
+ .sum::<usize>();
+ assert_eq!(read, 2, "Expected 2 rows to match the predicate");
+ }
+
#[tokio::test]
async fn evolved_schema_disjoint_schema_filter() {
let c1: ArrayRef =
@@ -1748,6 +1846,7 @@ mod tests {
let rt = RoundTrip::new()
.with_predicate(filter.clone())
.with_pushdown_predicate()
+ .with_bloom_filters()
.round_trip(vec![batch1])
.await;
diff --git a/datafusion/datasource-parquet/src/opener.rs
b/datafusion/datasource-parquet/src/opener.rs
index 376b403361..9e14425074 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -55,8 +55,9 @@ pub(super) struct ParquetOpener {
pub limit: Option<usize>,
/// Optional predicate to apply during the scan
pub predicate: Option<Arc<dyn PhysicalExpr>>,
- /// Schema of the output table
- pub table_schema: SchemaRef,
+ /// Schema of the output table without partition columns.
+ /// This is the schema we coerce the physical file schema into.
+ pub logical_file_schema: SchemaRef,
/// Optional hint for how large the initial request to read parquet
metadata
/// should be
pub metadata_size_hint: Option<usize>,
@@ -104,13 +105,13 @@ impl FileOpener for ParquetOpener {
let batch_size = self.batch_size;
let projected_schema =
- SchemaRef::from(self.table_schema.project(&self.projection)?);
+
SchemaRef::from(self.logical_file_schema.project(&self.projection)?);
let schema_adapter_factory = Arc::clone(&self.schema_adapter_factory);
let schema_adapter = self
.schema_adapter_factory
- .create(projected_schema, Arc::clone(&self.table_schema));
+ .create(projected_schema, Arc::clone(&self.logical_file_schema));
let predicate = self.predicate.clone();
- let table_schema = Arc::clone(&self.table_schema);
+ let logical_file_schema = Arc::clone(&self.logical_file_schema);
let reorder_predicates = self.reorder_filters;
let pushdown_filters = self.pushdown_filters;
let coerce_int96 = self.coerce_int96;
@@ -141,17 +142,20 @@ impl FileOpener for ParquetOpener {
.await?;
// Note about schemas: we are actually dealing with **3 different
schemas** here:
- // - The table schema as defined by the TableProvider. This is
what the user sees, what they get when they `SELECT * FROM table`, etc.
- // - The "virtual" file schema: this is the table schema minus any
hive partition columns and projections. This is what the file schema is coerced
to.
+ // - The table schema as defined by the TableProvider.
+ // This is what the user sees, what they get when they `SELECT *
FROM table`, etc.
+ // - The logical file schema: this is the table schema minus any
hive partition columns and projections.
+ // This is what the physicalfile schema is coerced to.
// - The physical file schema: this is the schema as defined by
the parquet file. This is what the parquet file actually contains.
let mut physical_file_schema =
Arc::clone(reader_metadata.schema());
// The schema loaded from the file may not be the same as the
// desired schema (for example if we want to instruct the parquet
// reader to read strings using Utf8View instead). Update if
necessary
- if let Some(merged) =
- apply_file_schema_type_coercions(&table_schema,
&physical_file_schema)
- {
+ if let Some(merged) = apply_file_schema_type_coercions(
+ &logical_file_schema,
+ &physical_file_schema,
+ ) {
physical_file_schema = Arc::new(merged);
options =
options.with_schema(Arc::clone(&physical_file_schema));
reader_metadata = ArrowReaderMetadata::try_new(
@@ -178,7 +182,7 @@ impl FileOpener for ParquetOpener {
// Build predicates for this specific file
let (pruning_predicate, page_pruning_predicate) =
build_pruning_predicates(
predicate.as_ref(),
- &physical_file_schema,
+ &logical_file_schema,
&predicate_creation_errors,
);
@@ -215,7 +219,7 @@ impl FileOpener for ParquetOpener {
let row_filter = row_filter::build_row_filter(
&predicate,
&physical_file_schema,
- &table_schema,
+ &logical_file_schema,
builder.metadata(),
reorder_predicates,
&file_metrics,
diff --git a/datafusion/datasource-parquet/src/row_filter.rs
b/datafusion/datasource-parquet/src/row_filter.rs
index cd0cbf087f..cde9e56c92 100644
--- a/datafusion/datasource-parquet/src/row_filter.rs
+++ b/datafusion/datasource-parquet/src/row_filter.rs
@@ -426,7 +426,7 @@ fn columns_sorted(_columns: &[usize], _metadata:
&ParquetMetaData) -> Result<boo
pub fn build_row_filter(
expr: &Arc<dyn PhysicalExpr>,
physical_file_schema: &SchemaRef,
- table_schema: &SchemaRef,
+ logical_file_schema: &SchemaRef,
metadata: &ParquetMetaData,
reorder_predicates: bool,
file_metrics: &ParquetFileMetrics,
@@ -447,7 +447,7 @@ pub fn build_row_filter(
FilterCandidateBuilder::new(
Arc::clone(expr),
Arc::clone(physical_file_schema),
- Arc::clone(table_schema),
+ Arc::clone(logical_file_schema),
Arc::clone(schema_adapter_factory),
)
.build(metadata)
diff --git a/datafusion/datasource-parquet/src/source.rs
b/datafusion/datasource-parquet/src/source.rs
index 13684db8ea..69347f440c 100644
--- a/datafusion/datasource-parquet/src/source.rs
+++ b/datafusion/datasource-parquet/src/source.rs
@@ -481,7 +481,7 @@ impl FileSource for ParquetSource {
.expect("Batch size must set before creating ParquetOpener"),
limit: base_config.limit,
predicate: self.predicate.clone(),
- table_schema: Arc::clone(&base_config.file_schema),
+ logical_file_schema: Arc::clone(&base_config.file_schema),
metadata_size_hint: self.metadata_size_hint,
metrics: self.metrics().clone(),
parquet_file_reader_factory,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]