Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]
adriangb commented on PR #16461: URL: https://github.com/apache/datafusion/pull/16461#issuecomment-3006801288 Great I'll address https://github.com/apache/datafusion/pull/16461#discussion_r2159713329 and then I think this will be ready to merge! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]
kosiew commented on PR #16461: URL: https://github.com/apache/datafusion/pull/16461#issuecomment-3006767151 @adriangb > @kosiew any objections to merging this? Nope. I am excited to see the solution of the puzzle. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]
adriangb commented on PR #16461: URL: https://github.com/apache/datafusion/pull/16461#issuecomment-3006603280 @kosiew any objections to merging this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]
adriangb commented on PR #16461: URL: https://github.com/apache/datafusion/pull/16461#issuecomment-2998806438 https://github.com/apache/datafusion/pull/16530 will be able to easily be incorporated into this work, completely eliminating what are currently expensive casts -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]
adriangb commented on code in PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#discussion_r2162856077
##
datafusion/physical-expr/src/schema_rewriter.rs:
##
@@ -0,0 +1,318 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Physical expression schema rewriting utilities
+
+use std::sync::Arc;
+
+use arrow::compute::can_cast_types;
+use arrow::datatypes::{FieldRef, Schema};
+use datafusion_common::{
+exec_err,
+tree_node::{Transformed, TransformedResult, TreeNode},
+Result, ScalarValue,
+};
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+
+use crate::expressions::{self, CastExpr, Column};
+
+/// Builder for rewriting physical expressions to match different schemas.
+///
+/// # Example
+///
+/// ```rust
+/// use datafusion_physical_expr::schema_rewriter::PhysicalExprSchemaRewriter;
+/// use arrow::datatypes::Schema;
+///
+/// # fn example(
+/// # predicate: std::sync::Arc,
+/// # physical_file_schema: &Schema,
+/// # logical_file_schema: &Schema,
+/// # ) -> datafusion_common::Result<()> {
+/// let rewriter = PhysicalExprSchemaRewriter::new(physical_file_schema,
logical_file_schema);
+/// let adapted_predicate = rewriter.rewrite(predicate)?;
+/// # Ok(())
+/// # }
+/// ```
+pub struct PhysicalExprSchemaRewriter<'a> {
+physical_file_schema: &'a Schema,
+logical_file_schema: &'a Schema,
+partition_fields: Vec,
+partition_values: Vec,
+}
+
+impl<'a> PhysicalExprSchemaRewriter<'a> {
+/// Create a new schema rewriter with the given schemas
+pub fn new(
+physical_file_schema: &'a Schema,
+logical_file_schema: &'a Schema,
+) -> Self {
+Self {
+physical_file_schema,
+logical_file_schema,
+partition_fields: Vec::new(),
+partition_values: Vec::new(),
+}
+}
+
+/// Add partition columns and their corresponding values
+///
+/// When a column reference matches a partition field, it will be replaced
+/// with the corresponding literal value from partition_values.
+pub fn with_partition_columns(
+mut self,
+partition_fields: Vec,
+partition_values: Vec,
+) -> Self {
+self.partition_fields = partition_fields;
+self.partition_values = partition_values;
+self
+}
+
+/// Rewrite the given physical expression to match the target schema
+///
+/// This method applies the following transformations:
+/// 1. Replaces partition column references with literal values
+/// 2. Handles missing columns by inserting null literals
+/// 3. Casts columns when logical and physical schemas have different types
+pub fn rewrite(&self, expr: Arc) -> Result> {
+expr.transform(|expr| self.rewrite_expr(expr)).data()
+}
+
+fn rewrite_expr(
+&self,
+expr: Arc,
+) -> Result>> {
+if let Some(column) = expr.as_any().downcast_ref::() {
+return self.rewrite_column(Arc::clone(&expr), column);
+}
+
+Ok(Transformed::no(expr))
+}
+
+fn rewrite_column(
+&self,
+expr: Arc,
+column: &Column,
+) -> Result>> {
+// Get the logical field for this column
+let logical_field = match
self.logical_file_schema.field_with_name(column.name())
+{
+Ok(field) => field,
+Err(e) => {
+// If the column is a partition field, we can use the
partition value
+if let Some(partition_value) =
self.get_partition_value(column.name()) {
+return
Ok(Transformed::yes(expressions::lit(partition_value)));
+}
+// If the column is not found in the logical schema and is not
a partition value, return an error
+// This should probably never be hit unless something upstream
broke, but nontheless it's better
+// for us to return a handleable error than to panic / do
something unexpected.
+return Err(e.into());
+}
+};
+
+// Check if the column exists in the physical schema
+let physical_column_index =
+match self.physica
Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]
adriangb commented on PR #16461: URL: https://github.com/apache/datafusion/pull/16461#issuecomment-3004630417 Thank you very much for the feedback @kosiew đđť! I don't mean to disregard it, you make great points, but I think they are surmountable! Let's move forward with this and keep iterating on the approaches in parallel. If it turns out that this approach won't work for projection evaluation, it's still clearly a win for predicate evaluation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]
adriangb commented on PR #16461: URL: https://github.com/apache/datafusion/pull/16461#issuecomment-3004580959 > Every invocation in the evaluator will loop over rows to build child arrays, then pack them into a StructArray As far as I know a PhysicalExpr can operate at the array level. For example `lit(ScalarValue::Null). into_array(N)` will end up calling `new_null_array` as well after a a couple function call hops: https://github.com/apache/datafusion/blob/e3d3302161d382b9219c4536ad5ec0ce93690ba8/datafusion/common/src/scalar/mod.rs#L2672 I think 3-4 function call hops would be an issue if it did happen for every row but it's happening at the array level - it's going to be inconsequential compared to the IO happening, etc. > Thereâs no built-in âstruct constructorâ expression in DataFusion Isn't there https://datafusion.apache.org/user-guide/sql/scalar_functions.html#struct? I'm sure we can call that ourselves without SQL: https://github.com/apache/datafusion/blob/e3d3302161d382b9219c4536ad5ec0ce93690ba8/datafusion/functions/src/core/struct.rs#L53-L71 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]
kosiew commented on PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#issuecomment-3003420880
hi @adriangb
> could you take a look at
https://github.com/apache/datafusion/commit/32725dd621ec5e96caf1970433f3549dca977a80?
đđđ
The new tests in
`PhysicalExprSchemaRewriter`âs suiteâincluding your â`test_adapt_batches`â
exampleâ*do* demonstrate that we can:
1. **Rewrite** a projection (or filter) against a physical `RecordBatch`,
2. **Evaluate** those rewritten `PhysicalExpr`s on the old batch to
3. **Produce** a brand-new `RecordBatch` that (a) injects nulls for missing
top-level columns, (b) applies casts, and (c) drops extra columnsâall in one go.
So yes, for **flat** schemas and simple projections/filters, we can entirely
sidestep a separate `map_batch` / `cast_struct_column` step by:
- Generating an expression per target column (either `col(...)` if present
or `lit(NULL)` if absent),
- Letting the engine âevaluateâ those expressions into new arrays, and
- Bundling them into a fresh `RecordBatch`.
â
**Where the rewrite-only approach shines**
- **Simplicity** for top-level columns. We only need `col` + `lit(NULL)` +
`CastExpr`.
- **Unified code path**: predicates *and* projections both go through the
same rewriter + evaluator.
- **Less bespoke iterator logic**: no custom `StructArray` walks, no
recursive field-matching loops.
---
â ď¸ **Where the schema-adapter approach still wins**
1. **Deeply nested structs**
- Thereâs *no* built-in âstruct constructorâ expression in DataFusionâs
evaluator that I know of
Our rewrite + `batch_project` hack only handles top-level arrays. We
canât easily say
*âbuild a `StructArray` whose fields are (`col("a.b")`, `lit(NULL)`,
`cast(col("a.c")`, âŚ))â* purely in expression form
2. **Performance**
- Expression evaluation involves building `ArrayRef`s by walking millions
of rows through the `PhysicalExpr` vtable.
- The adapterâs `cast_struct_column` does one recursive scan through each
`StructArray`'s memory, which is far more cache-friendly for bulk columnar
operations.
3. **Full schema fidelity**
- The rewrite test only demonstrates:
- *Drop* âextraâ columns,
- *Inject null* for missing *top-level* columns,
- *Cast* primitive types.
- It doesnât cover:
- **Adding** a new nested struct (weâd need to build that `StructArray`
via expressions we donât have),
- **Recursively** updating sub-children,
- **Preserving** null bit-maps across nested levels.
```
> Complex handling for deeply nested types.
I do think this is a concern, I'm not sure how hard it would be to actually
implement, but it's theoretically very possible
```
### Why itâs possible but a lot of work (and a performance risk)
1. **Engineering effort**
- Weâll be growing the rewriter from a column/cast replacer into a full
recursive schema-walker + struct-node constructor.
- Weâll have to handle every corner case: non-nullable nested fields,
mixed present+missing children, ordering of fields, metadata, etc.
2. **Runtime performance**
- Every invocation in the evaluator will loop over *rows* to build child
arrays, then pack them into a `StructArray`.
- Thatâs orders of magnitude slower than a tight `cast_struct_column`
implementation that does one bulk pass through the existing `StructArray`
buffers.
I hope I don't sound like I am dead against the rewrite approach.
It is more like you have shown me a puzzle that I don't know how to solve.
### What I would love to hear
Here's a simpler and faster approach .
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]
adriangb commented on PR #16461: URL: https://github.com/apache/datafusion/pull/16461#issuecomment-3000255690 @kosiew I'm not sure I agree with the conclusions there. Why can't we use expressions to do the schema adapting during the scan? It's very possible as @alamb pointed out in https://github.com/apache/datafusion/pull/16461#issuecomment-2997870791 to feed a RecordBatch into a an expression and get back a new array. So unless I'm missing something I don't think these are correct: > Expression rewriting is great for pushdown but batch-level adapters are needed for correct, shaped data. > No effect on RecordBatch structure. > Limited scope (only predicates and pruning). > Possibly poorer performance due to repeated expression rewrites. There's no more expression rewrites than there are SchemaAdapters created. Those aren't cached either and are created for each file. I'll put together an example to show how predicate rewrites can be used to reshape data. But also FWIW that's exactly how ProjectionExec works. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]
adriangb commented on PR #16461: URL: https://github.com/apache/datafusion/pull/16461#issuecomment-3000698443 @kosiew could you take a look at 32725dd? > Complex handling for deeply nested types. I do think this is a concern, I'm not sure how hard it would be to actually implement, but it's theoretically very possible and I think we should be able to make it easy to implement with some elbow grease / the right helpers and abstractions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]
kosiew commented on PR #16461: URL: https://github.com/apache/datafusion/pull/16461#issuecomment-3000612368 @adriangb , Sorry, it was not my intention to presume the conclusions. I do look forward to a solution that handles schema adaptation in one pass. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]
kosiew commented on PR #16461: URL: https://github.com/apache/datafusion/pull/16461#issuecomment-2999138805 Adding notes for future reference: --- # Summary: Adapting Filter Expressions to File Schema During Parquet Scan --- ## Background & Goal - Apache DataFusion wants to improve how filter expressions (predicates) and projections are adapted to the **physical file schema** during Parquet scans. - The effort aims to: - Move closer to handling nested struct schema evolution (#15780). - Replace older `SchemaAdapter` machinery with a new builder-based approach. - Support expression rewrites for projection and selection pushdown (#14993, #15057). - Make it easier to work with schema evolution on nested structs (#15821). - Enable simpler hooks for handling missing columns and expression transformations. --- ## Key Concepts ### 1. Expression Rewriting (Pushdown Adaptation) - Rewrites filter and projection expressions to align with the **fileâs physical schema**. - Examples: - If an expression refers to a nested field `foo.baz` that is missing on disk â rewrite to `lit(NULL)`. - If a field has different physical type on disk vs. logical schema â add casts. - This rewriting ensures that predicate pushdown logic and filters do not error out when the *on-disk* schema differs from the *logical* schema. - Expression rewriting happens **before** reading data and uses the physical schema to safely prune row groups. ### 2. Data Adaptation (Batch-Level Reshaping) - After reading a `RecordBatch` or arrays from Parquet, reshape them to match the **logical table schema**. - Actions include: - Adding null arrays for missing nested fields (nested struct imputation). - Dropping columns no longer part of the logical schema. - Recursively casting nested struct types to match the logical type. - This ensures downstream operators receive data shaped exactly as expected in the query, despite schema evolution. --- ## Main Discussion Points | Topic | Details | | -- | - | | **Proposed Approach** | Introduce a `PhysicalExprSchemaRewriter` builder to adapt expressions to file schema during pruning/scanning. | | **Nested Struct Imputation** | Expression-only rewrites are limited for nested structs since they do not modify the actual data arrays. | | **Data vs. Expression Adaptation** | Expression rewriting is great for pushdown but batch-level adapters are needed for correct, shaped data. | | **Complementary Approach** | Use expression rewriting for filters/projections + array-level adapters (e.g. `cast_struct_column`) to reshape in-memory data. | | **Projection Pushdown Scenario** | A scan can receive full projection expressions (e.g. `a + b`), which get adapted and evaluated on RecordBatch, producing final output. | | **Risks of Expression-Only Rewrites** | - No effect on RecordBatch structure.- Limited scope (only predicates and pruning).- Risk of code duplication.- Complex handling for deeply nested types.- Possibly poorer performance due to repeated expression rewrites. | | **Potential Benefits of Expression-Based Rewrites** | Cleaner pruning path, simpler code, no fake batches for evaluation, reusable visitor pattern.| --- ## Diagram: Data Adaptation vs. Expression Rewriting Flow ```text +--+ +--+ ++ | Query Logical | | File Physical| | In-Memory Batch | | Schema | | Schema | | (RecordBatch) | | (e.g. table) | | (Parquet file) | | | +--+ +--+ ++ | | | | | | |Expression Rewriting | | | <--- adapts -- | |
Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]
adriangb commented on PR #16461: URL: https://github.com/apache/datafusion/pull/16461#issuecomment-2998670275 I opened https://github.com/apache/datafusion/issues/16528 to track further ideas / steps -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]
alamb commented on code in PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#discussion_r2162492604
##
datafusion/physical-expr/src/schema_rewriter.rs:
##
@@ -0,0 +1,318 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Physical expression schema rewriting utilities
+
+use std::sync::Arc;
+
+use arrow::compute::can_cast_types;
+use arrow::datatypes::{FieldRef, Schema};
+use datafusion_common::{
+exec_err,
+tree_node::{Transformed, TransformedResult, TreeNode},
+Result, ScalarValue,
+};
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+
+use crate::expressions::{self, CastExpr, Column};
+
+/// Builder for rewriting physical expressions to match different schemas.
+///
+/// # Example
+///
+/// ```rust
+/// use datafusion_physical_expr::schema_rewriter::PhysicalExprSchemaRewriter;
+/// use arrow::datatypes::Schema;
+///
+/// # fn example(
+/// # predicate: std::sync::Arc,
+/// # physical_file_schema: &Schema,
+/// # logical_file_schema: &Schema,
+/// # ) -> datafusion_common::Result<()> {
+/// let rewriter = PhysicalExprSchemaRewriter::new(physical_file_schema,
logical_file_schema);
+/// let adapted_predicate = rewriter.rewrite(predicate)?;
+/// # Ok(())
+/// # }
+/// ```
+pub struct PhysicalExprSchemaRewriter<'a> {
+physical_file_schema: &'a Schema,
+logical_file_schema: &'a Schema,
+partition_fields: Vec,
+partition_values: Vec,
+}
+
+impl<'a> PhysicalExprSchemaRewriter<'a> {
+/// Create a new schema rewriter with the given schemas
+pub fn new(
+physical_file_schema: &'a Schema,
+logical_file_schema: &'a Schema,
+) -> Self {
+Self {
+physical_file_schema,
+logical_file_schema,
+partition_fields: Vec::new(),
+partition_values: Vec::new(),
+}
+}
+
+/// Add partition columns and their corresponding values
+///
+/// When a column reference matches a partition field, it will be replaced
+/// with the corresponding literal value from partition_values.
+pub fn with_partition_columns(
+mut self,
+partition_fields: Vec,
+partition_values: Vec,
+) -> Self {
+self.partition_fields = partition_fields;
+self.partition_values = partition_values;
+self
+}
+
+/// Rewrite the given physical expression to match the target schema
+///
+/// This method applies the following transformations:
+/// 1. Replaces partition column references with literal values
+/// 2. Handles missing columns by inserting null literals
+/// 3. Casts columns when logical and physical schemas have different types
+pub fn rewrite(&self, expr: Arc) -> Result> {
+expr.transform(|expr| self.rewrite_expr(expr)).data()
+}
+
+fn rewrite_expr(
+&self,
+expr: Arc,
+) -> Result>> {
+if let Some(column) = expr.as_any().downcast_ref::() {
+return self.rewrite_column(Arc::clone(&expr), column);
+}
+
+Ok(Transformed::no(expr))
+}
+
+fn rewrite_column(
+&self,
+expr: Arc,
+column: &Column,
+) -> Result>> {
+// Get the logical field for this column
+let logical_field = match
self.logical_file_schema.field_with_name(column.name())
+{
+Ok(field) => field,
+Err(e) => {
+// If the column is a partition field, we can use the
partition value
+if let Some(partition_value) =
self.get_partition_value(column.name()) {
+return
Ok(Transformed::yes(expressions::lit(partition_value)));
+}
+// If the column is not found in the logical schema and is not
a partition value, return an error
+// This should probably never be hit unless something upstream
broke, but nontheless it's better
+// for us to return a handleable error than to panic / do
something unexpected.
+return Err(e.into());
+}
+};
+
+// Check if the column exists in the physical schema
+let physical_column_index =
+match self.physical_f
Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]
adriangb commented on code in PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#discussion_r2162548231
##
datafusion/physical-expr/src/schema_rewriter.rs:
##
@@ -0,0 +1,318 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Physical expression schema rewriting utilities
+
+use std::sync::Arc;
+
+use arrow::compute::can_cast_types;
+use arrow::datatypes::{FieldRef, Schema};
+use datafusion_common::{
+exec_err,
+tree_node::{Transformed, TransformedResult, TreeNode},
+Result, ScalarValue,
+};
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+
+use crate::expressions::{self, CastExpr, Column};
+
+/// Builder for rewriting physical expressions to match different schemas.
+///
+/// # Example
+///
+/// ```rust
+/// use datafusion_physical_expr::schema_rewriter::PhysicalExprSchemaRewriter;
+/// use arrow::datatypes::Schema;
+///
+/// # fn example(
+/// # predicate: std::sync::Arc,
+/// # physical_file_schema: &Schema,
+/// # logical_file_schema: &Schema,
+/// # ) -> datafusion_common::Result<()> {
+/// let rewriter = PhysicalExprSchemaRewriter::new(physical_file_schema,
logical_file_schema);
+/// let adapted_predicate = rewriter.rewrite(predicate)?;
+/// # Ok(())
+/// # }
+/// ```
+pub struct PhysicalExprSchemaRewriter<'a> {
+physical_file_schema: &'a Schema,
+logical_file_schema: &'a Schema,
+partition_fields: Vec,
+partition_values: Vec,
+}
+
+impl<'a> PhysicalExprSchemaRewriter<'a> {
+/// Create a new schema rewriter with the given schemas
+pub fn new(
+physical_file_schema: &'a Schema,
+logical_file_schema: &'a Schema,
+) -> Self {
+Self {
+physical_file_schema,
+logical_file_schema,
+partition_fields: Vec::new(),
+partition_values: Vec::new(),
+}
+}
+
+/// Add partition columns and their corresponding values
+///
+/// When a column reference matches a partition field, it will be replaced
+/// with the corresponding literal value from partition_values.
+pub fn with_partition_columns(
+mut self,
+partition_fields: Vec,
+partition_values: Vec,
+) -> Self {
+self.partition_fields = partition_fields;
+self.partition_values = partition_values;
+self
+}
+
+/// Rewrite the given physical expression to match the target schema
+///
+/// This method applies the following transformations:
+/// 1. Replaces partition column references with literal values
+/// 2. Handles missing columns by inserting null literals
+/// 3. Casts columns when logical and physical schemas have different types
+pub fn rewrite(&self, expr: Arc) -> Result> {
+expr.transform(|expr| self.rewrite_expr(expr)).data()
+}
+
+fn rewrite_expr(
+&self,
+expr: Arc,
+) -> Result>> {
+if let Some(column) = expr.as_any().downcast_ref::() {
+return self.rewrite_column(Arc::clone(&expr), column);
+}
+
+Ok(Transformed::no(expr))
+}
+
+fn rewrite_column(
+&self,
+expr: Arc,
+column: &Column,
+) -> Result>> {
+// Get the logical field for this column
+let logical_field = match
self.logical_file_schema.field_with_name(column.name())
+{
+Ok(field) => field,
+Err(e) => {
+// If the column is a partition field, we can use the
partition value
+if let Some(partition_value) =
self.get_partition_value(column.name()) {
+return
Ok(Transformed::yes(expressions::lit(partition_value)));
+}
+// If the column is not found in the logical schema and is not
a partition value, return an error
+// This should probably never be hit unless something upstream
broke, but nontheless it's better
+// for us to return a handleable error than to panic / do
something unexpected.
+return Err(e.into());
+}
+};
+
+// Check if the column exists in the physical schema
+let physical_column_index =
+match self.physica
Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]
alamb commented on PR #16461: URL: https://github.com/apache/datafusion/pull/16461#issuecomment-2997873713 I would personally recommend proceeding in parallel with the two approaches, ensuring there are good end to end tests (.slt) -- and then if we find that the projection pushdown / rewriting code can subsume the schema adapter code we could make a PR to do that đ¤ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]
alamb commented on PR #16461: URL: https://github.com/apache/datafusion/pull/16461#issuecomment-2997870791 > But weâd still need an array-level counterpart to actually materialize those null nested fields in the RecordBatch when we call map_batch. FWIW I think this is one mechanism to turn the expression into an array (you just need to evaluate the expression into a PhysicalExpr): - https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.ColumnarValue.html#method.into_array -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]
adriangb commented on PR #16461: URL: https://github.com/apache/datafusion/pull/16461#issuecomment-2996225226 Thanks for the thoughtful reply. An important point that I forgot to mention: we're actively working on projection / selection pushdown which would involve pushing down expressions into the scan, thus we *would* evaluate the expressions and materialize the record batches in the output. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]
kosiew commented on PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#issuecomment-2994947013
@adriangb
Thanks for the ping on this.
> Would it be possible to implement the nested struct imputation work you're
doing with this approach?
Do you mean reusing the PhysicalExprSchemaRewriter machinery to drive
nestedâstruct imputation?
Here're some disadvantages of the rewriteâcentric approach versus the more
dataâcentric adapter approach:
- Expression-only, not data-only: This never actually transforms the
underlying RecordBatch columnsâif downstream logic (or the user) inspects a
struct column directly, they wonât see the new null fields injected. Weâd still
need array-level imputation for correctness in the result batches.
- Limited to predicate contexts: The rewriter hooks into filter and pruning,
but our broader schema-evolution needs (e.g. reading all columns, SELECT *,
writing out evolved nested structs) fall outside its scope.
- Duplication risk: We end up reinventing part of the schema-adapterâs
compatibility logic (matching fields by name, casting types) inside the
rewriter, which can drift from the adapterâs rules over time.
- Complexity with deep nesting: Recursively handling deeply nested structs
inside an expression treeâand ensuring every nestedâfield access gets rewritten
with the right shapeâquickly becomes more intricate than a simple tree visitor.
- Performance implications: Constantly rewriting and reconstructing
expression trees (and then evaluating those casts/lits) might be less efficient
than bulk arrayâlevel casts + struct builds, especially on wide tables.
So, could we bolt nestedâstruct imputation onto his rewriter? Technically
yes, we could extend rewrite_column so that, whenever we see a Column referring
to foo.bar.baz thatâs missing in the physical schema, you generate a
Literal::Null of the full nested type (constructing the proper StructValue).
But weâd still need an array-level counterpart to actually materialize those
null nested fields in the RecordBatch when we call map_batch.
In practice, the two approaches complement each other:
Use the rewriter to handle predicate and projection expressions (so filters
and column references donât blow up).
Continue to rely on cast_struct_column + NestedStructSchemaAdapter to adapt
the actual batch data, filling in null arrays and doing recursive casts.
That way we get the best of both worldsâclean, centralized expression
rewriting for pushdown, and robust array-level marshalling for the final
tables. đ
## Why the two-pronged approach makes sense
1. Pushdown vs. Data Adaptation Are Different Concerns
The PhysicalExprSchemaRewriter is perfect for rewriting predicates and
projections so they donât blow up when the on-disk schema diverges.
But once youâve read that Parquet row group into memory, you still need to
reshape the StructArray itselfâfilling in null arrays for new nested fields,
dropping old ones, recursively casting types.
2. Keeping Pruning Code Lean
Swapping out the old SchemaMapper for the rewriter in your pruning path is a
great win: much less boilerplate, better separation of concerns, and no more
âfake record batchesâ just to evaluate a filter.
You can remove all of that pruning-specific adapter code and lean on the
rewriterâs tree visitor.
3. Deep Schema-Evolution Still Lives in the Adapter
Handling a top-level missing column is easy in the rewriter (you just
rewrite col("foo") to lit(NULL)), but handling col("a.b.c") where b itself is a
new struct, and c is a new field inside it⌠thatâs far more natural in a
recursive cast_struct_column that operates on StructArray children.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]
kosiew commented on PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#issuecomment-2994965537
Putting more words to how I understand pushdown and data adaptation:
1. Pushdown â âWhich rows or pages should I read?â
- Input: your original predicate (e.g. col("foo.b") > 5) and the physical
Parquet schema.
- What the rewriter does:
- Sees that foo.b doesnât exist on disk â replaces col("foo.b") > 5 with
lit(NULL) > 5.
- Or if foo.a is stored as Int32 but the table expects Int64, it wraps
col("foo.a") in a cast.
- Result: you get a âsafeâ predicate that Parquet can evaluate against
rowâgroup statistics or pages without error.
- Outcome: you prune away unneeded row groups, or skip pages, based on that
rewritten expression.
At the end of this step, no data has actually been materializedâyouâve only
modified the expression you use to decide what to read.
2. Data adaptation â âHow do I shape the in-memory batch to match the
logical schema?â
- Input: a RecordBatch (or StructArray) that you read directly from Parquet.
- This batch is laid out exactly as on disk: it only has the columns that
existed in that fileâs schema, and nested structs only contain the old fields.
- What the adapter does (map_batch / cast_struct_column):
- Field matching: for each field in your logical (table) schema, look it
up by name in the batchâs arrays.
- Missing fields â insert a new_null_array(...) of the right datatype and
row count.
- Extra fields (present on disk but dropped in the table) â ignore them.
- Nested structs â recurse into child struct arrays, doing the same
match/fill/ignore/cast logic at each level.
- Result: a brand-new StructArray (and overall RecordBatch) whose columns
exactly line up with your table schemaâeven for deeply nested new fields.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]
adriangb commented on PR #16461: URL: https://github.com/apache/datafusion/pull/16461#issuecomment-2994216351 @kosiew I'm curious what you think about this. Would it be possible to implement the nested struct imputation work you're doing with this approach? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]
adriangb commented on PR #16461: URL: https://github.com/apache/datafusion/pull/16461#issuecomment-2994242282 Just for fun I opened https://github.com/pydantic/datafusion/pull/31 to see how hard it would be to incorporate https://github.com/apache/datafusion/issues/15780#issuecomment-2824716928, not too bad! Most of the code could be shared with the logical layer with some refactoring `pub`ing, etc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]
adriangb commented on PR #16461: URL: https://github.com/apache/datafusion/pull/16461#issuecomment-2994214075 My hopes with this work is that we can: - Make it easier to do further optimizations like https://github.com/apache/datafusion/issues/15780#issuecomment-2824716928 - Replace the existing SchemaAdapter machinery - Add hooks to this builder for `with_missing_column_handling(...)` to close #15261 and `visit_expression(f: Fn(Arc, &Schema) -> Transformed>)` to close #15057 - Make it easier to work #15821 in because we scan at the file schema -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]
adriangb commented on PR #16461: URL: https://github.com/apache/datafusion/pull/16461#issuecomment-2993667691 @alamb I've created the builder, moved the implementation and added some unit tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]
adriangb commented on code in PR #16461: URL: https://github.com/apache/datafusion/pull/16461#discussion_r2159712965 ## datafusion/datasource-parquet/src/opener.rs: ## @@ -524,6 +539,84 @@ fn should_enable_page_index( .unwrap_or(false) } +use datafusion_physical_expr::expressions; + +/// Given a [`PhysicalExpr`] and a [`SchemaRef`], returns a new [`PhysicalExpr`] that +/// is cast to the specified data type. +/// Preference is always given to casting literal values to the data type of the column +/// since casting the column to the literal value's data type can be significantly more expensive. +/// Given two columns the cast is applied arbitrarily to the first column. +pub fn cast_expr_to_schema( Review Comment: Sounds good to me, I will work on this next! Agreed on the unit tests đ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]
adriangb commented on code in PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#discussion_r2159713329
##
datafusion/datasource-parquet/src/row_filter.rs:
##
@@ -520,111 +489,15 @@ mod test {
let expr = col("int64_list").is_not_null();
let expr = logical2physical(&expr, &table_schema);
-let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
let table_schema = Arc::new(table_schema.clone());
-let candidate = FilterCandidateBuilder::new(
-expr,
-table_schema.clone(),
-table_schema,
-schema_adapter_factory,
-)
-.build(metadata)
-.expect("building candidate");
+let candidate = FilterCandidateBuilder::new(expr, table_schema.clone())
+.build(metadata)
+.expect("building candidate");
assert!(candidate.is_none());
}
-#[test]
-fn test_filter_type_coercion() {
Review Comment:
I think so - I'll have to make it more e2e since it is no longer specific to
the row filter. We have some other similar tests, I'll work this into there
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]
alamb commented on code in PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#discussion_r2159679800
##
datafusion/datasource-parquet/src/opener.rs:
##
@@ -524,6 +539,84 @@ fn should_enable_page_index(
.unwrap_or(false)
}
+use datafusion_physical_expr::expressions;
+
+/// Given a [`PhysicalExpr`] and a [`SchemaRef`], returns a new
[`PhysicalExpr`] that
+/// is cast to the specified data type.
+/// Preference is always given to casting literal values to the data type of
the column
+/// since casting the column to the literal value's data type can be
significantly more expensive.
+/// Given two columns the cast is applied arbitrarily to the first column.
+pub fn cast_expr_to_schema(
+expr: Arc,
+physical_file_schema: &Schema,
+logical_file_schema: &Schema,
+partition_values: Vec,
+partition_fields: &[FieldRef],
+) -> Result> {
+expr.transform(|expr| {
+if let Some(column) =
expr.as_any().downcast_ref::() {
+let logical_field = match
logical_file_schema.field_with_name(column.name()) {
+Ok(field) => field,
+Err(e) => {
+// If the column is a partition field, we can use the
partition value
+for (partition_field, partition_value) in
+partition_fields.iter().zip(partition_values.iter())
+{
+if partition_field.name() == column.name() {
+return Ok(Transformed::yes(expressions::lit(
+partition_value.clone(),
+)));
+}
+}
+// If the column is not found in the logical schema,
return an error
+// This should probably never be hit unless something
upstream broke, but nontheless it's better
+// for us to return a handleable error than to panic / do
something unexpected.
+return Err(e.into());
+}
+};
+let Ok(physical_field) =
physical_file_schema.field_with_name(column.name())
+else {
+if !logical_field.is_nullable() {
+return exec_err!(
+"Non-nullable column '{}' is missing from the physical
schema",
+column.name()
+);
+}
+// If the column is missing from the physical schema fill it
in with nulls as `SchemaAdapter` would do.
+// TODO: do we need to sync this with what the `SchemaAdapter`
actually does?
+// While the default implementation fills in nulls in theory a
custom `SchemaAdapter` could do something else!
+let value =
ScalarValue::Null.cast_to(logical_field.data_type())?;
+return Ok(Transformed::yes(expressions::lit(value)));
+};
+
+if logical_field.data_type() == physical_field.data_type() {
+return Ok(Transformed::no(expr));
+}
+
+// If the logical field and physical field are different, we need
to cast
+// the column to the logical field's data type.
+// We will try later to move the cast to literal values if
possible, which is computationally cheaper.
Review Comment:
đ
##
datafusion/datasource-parquet/src/opener.rs:
##
@@ -524,6 +539,84 @@ fn should_enable_page_index(
.unwrap_or(false)
}
+use datafusion_physical_expr::expressions;
+
+/// Given a [`PhysicalExpr`] and a [`SchemaRef`], returns a new
[`PhysicalExpr`] that
+/// is cast to the specified data type.
+/// Preference is always given to casting literal values to the data type of
the column
+/// since casting the column to the literal value's data type can be
significantly more expensive.
+/// Given two columns the cast is applied arbitrarily to the first column.
+pub fn cast_expr_to_schema(
+expr: Arc,
+physical_file_schema: &Schema,
+logical_file_schema: &Schema,
+partition_values: Vec,
+partition_fields: &[FieldRef],
+) -> Result> {
+expr.transform(|expr| {
+if let Some(column) =
expr.as_any().downcast_ref::() {
+let logical_field = match
logical_file_schema.field_with_name(column.name()) {
+Ok(field) => field,
+Err(e) => {
+// If the column is a partition field, we can use the
partition value
+for (partition_field, partition_value) in
+partition_fields.iter().zip(partition_values.iter())
+{
+if partition_field.name() == column.name() {
+return Ok(Transformed::yes(expressions::lit(
+partition_value.clone(),
+)));
+}
+
Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]
adriangb commented on code in PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#discussion_r2157528156
##
datafusion/datasource-parquet/src/opener.rs:
##
@@ -879,4 +972,107 @@ mod test {
assert_eq!(num_batches, 0);
assert_eq!(num_rows, 0);
}
+
+#[tokio::test]
+async fn test_prune_on_partition_value_and_data_value() {
+let store = Arc::new(InMemory::new()) as Arc;
+
+// Note: number 3 is missing!
+let batch = record_batch!(("a", Int32, vec![Some(1), Some(2),
Some(4)])).unwrap();
+let data_size =
+write_parquet(Arc::clone(&store), "part=1/file.parquet",
batch.clone()).await;
+
+let file_schema = batch.schema();
+let mut file = PartitionedFile::new(
+"part=1/file.parquet".to_string(),
+u64::try_from(data_size).unwrap(),
+);
+file.partition_values = vec![ScalarValue::Int32(Some(1))];
+
+let table_schema = Arc::new(Schema::new(vec![
+Field::new("part", DataType::Int32, false),
+Field::new("a", DataType::Int32, false),
+]));
+
+let make_opener = |predicate| {
+ParquetOpener {
+partition_index: 0,
+projection: Arc::new([0]),
+batch_size: 1024,
+limit: None,
+predicate: Some(predicate),
+logical_file_schema: file_schema.clone(),
+metadata_size_hint: None,
+metrics: ExecutionPlanMetricsSet::new(),
+parquet_file_reader_factory: Arc::new(
+DefaultParquetFileReaderFactory::new(Arc::clone(&store)),
+),
+partition_fields: vec![Arc::new(Field::new(
+"part",
+DataType::Int32,
+false,
+))],
+pushdown_filters: true, // note that this is true!
+reorder_filters: true,
+enable_page_index: false,
+enable_bloom_filter: false,
+schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
+enable_row_group_stats_pruning: false, // note that this is
false!
+coerce_int96: None,
+}
+};
+
+let make_meta = || FileMeta {
+object_meta: ObjectMeta {
+location: Path::from("part=1/file.parquet"),
+last_modified: Utc::now(),
+size: u64::try_from(data_size).unwrap(),
+e_tag: None,
+version: None,
+},
+range: None,
+extensions: None,
+metadata_size_hint: None,
+};
+
+// Filter should match the partition value and data value
+let expr = col("part").eq(lit(1)).or(col("a").eq(lit(1)));
+let predicate = logical2physical(&expr, &table_schema);
+let opener = make_opener(predicate);
+let stream = opener
+.open(make_meta(), file.clone())
+.unwrap()
+.await
+.unwrap();
+let (num_batches, num_rows) = count_batches_and_rows(stream).await;
+assert_eq!(num_batches, 1);
+assert_eq!(num_rows, 3);
+
+// Filter should match the partition value but not the data value
+let expr = col("part").eq(lit(1)).or(col("a").eq(lit(3)));
+let predicate = logical2physical(&expr, &table_schema);
+let opener = make_opener(predicate);
+let stream = opener.open(make_meta(),
file.clone()).unwrap().await.unwrap();
+let (num_batches, num_rows) = count_batches_and_rows(stream).await;
+assert_eq!(num_batches, 1);
+assert_eq!(num_rows, 3);
+
+// Filter should not match the partition value but match the data value
+let expr = col("part").eq(lit(2)).or(col("a").eq(lit(1)));
+let predicate = logical2physical(&expr, &table_schema);
+let opener = make_opener(predicate);
+let stream = opener.open(make_meta(),
file.clone()).unwrap().await.unwrap();
+let (num_batches, num_rows) = count_batches_and_rows(stream).await;
+assert_eq!(num_batches, 1);
+assert_eq!(num_rows, 1);
Review Comment:
This assertion fails on `main`: all 3 rows are passed because the row filter
cannot handle the partition columns. This PR somewhat coincidentally happens to
allow the row filter to handle predicates that depend on partition and data
columns!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]
adriangb commented on code in PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#discussion_r2157528156
##
datafusion/datasource-parquet/src/opener.rs:
##
@@ -879,4 +972,107 @@ mod test {
assert_eq!(num_batches, 0);
assert_eq!(num_rows, 0);
}
+
+#[tokio::test]
+async fn test_prune_on_partition_value_and_data_value() {
+let store = Arc::new(InMemory::new()) as Arc;
+
+// Note: number 3 is missing!
+let batch = record_batch!(("a", Int32, vec![Some(1), Some(2),
Some(4)])).unwrap();
+let data_size =
+write_parquet(Arc::clone(&store), "part=1/file.parquet",
batch.clone()).await;
+
+let file_schema = batch.schema();
+let mut file = PartitionedFile::new(
+"part=1/file.parquet".to_string(),
+u64::try_from(data_size).unwrap(),
+);
+file.partition_values = vec![ScalarValue::Int32(Some(1))];
+
+let table_schema = Arc::new(Schema::new(vec![
+Field::new("part", DataType::Int32, false),
+Field::new("a", DataType::Int32, false),
+]));
+
+let make_opener = |predicate| {
+ParquetOpener {
+partition_index: 0,
+projection: Arc::new([0]),
+batch_size: 1024,
+limit: None,
+predicate: Some(predicate),
+logical_file_schema: file_schema.clone(),
+metadata_size_hint: None,
+metrics: ExecutionPlanMetricsSet::new(),
+parquet_file_reader_factory: Arc::new(
+DefaultParquetFileReaderFactory::new(Arc::clone(&store)),
+),
+partition_fields: vec![Arc::new(Field::new(
+"part",
+DataType::Int32,
+false,
+))],
+pushdown_filters: true, // note that this is true!
+reorder_filters: true,
+enable_page_index: false,
+enable_bloom_filter: false,
+schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
+enable_row_group_stats_pruning: false, // note that this is
false!
+coerce_int96: None,
+}
+};
+
+let make_meta = || FileMeta {
+object_meta: ObjectMeta {
+location: Path::from("part=1/file.parquet"),
+last_modified: Utc::now(),
+size: u64::try_from(data_size).unwrap(),
+e_tag: None,
+version: None,
+},
+range: None,
+extensions: None,
+metadata_size_hint: None,
+};
+
+// Filter should match the partition value and data value
+let expr = col("part").eq(lit(1)).or(col("a").eq(lit(1)));
+let predicate = logical2physical(&expr, &table_schema);
+let opener = make_opener(predicate);
+let stream = opener
+.open(make_meta(), file.clone())
+.unwrap()
+.await
+.unwrap();
+let (num_batches, num_rows) = count_batches_and_rows(stream).await;
+assert_eq!(num_batches, 1);
+assert_eq!(num_rows, 3);
+
+// Filter should match the partition value but not the data value
+let expr = col("part").eq(lit(1)).or(col("a").eq(lit(3)));
+let predicate = logical2physical(&expr, &table_schema);
+let opener = make_opener(predicate);
+let stream = opener.open(make_meta(),
file.clone()).unwrap().await.unwrap();
+let (num_batches, num_rows) = count_batches_and_rows(stream).await;
+assert_eq!(num_batches, 1);
+assert_eq!(num_rows, 3);
+
+// Filter should not match the partition value but match the data value
+let expr = col("part").eq(lit(2)).or(col("a").eq(lit(1)));
+let predicate = logical2physical(&expr, &table_schema);
+let opener = make_opener(predicate);
+let stream = opener.open(make_meta(),
file.clone()).unwrap().await.unwrap();
+let (num_batches, num_rows) = count_batches_and_rows(stream).await;
+assert_eq!(num_batches, 1);
+assert_eq!(num_rows, 1);
Review Comment:
This assertion fails on `main`: all 3 rows are passed because the row filter
cannot handle the partition columns. This PR somewhat coincidentally happens to
fix this!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] adapt filter expressions to file schema during parquet scan [datafusion]
adriangb commented on code in PR #16461:
URL: https://github.com/apache/datafusion/pull/16461#discussion_r2157496405
##
datafusion/datasource-parquet/src/opener.rs:
##
@@ -524,6 +532,62 @@ fn should_enable_page_index(
.unwrap_or(false)
}
+use datafusion_physical_expr::expressions;
+
+/// Given a [`PhysicalExpr`] and a [`SchemaRef`], returns a new
[`PhysicalExpr`] that
+/// is cast to the specified data type.
+/// Preference is always given to casting literal values to the data type of
the column
+/// since casting the column to the literal value's data type can be
significantly more expensive.
+/// Given two columns the cast is applied arbitrarily to the first column.
+pub fn cast_expr_to_schema(
+expr: Arc,
+physical_file_schema: &Schema,
+logical_file_schema: &Schema,
+) -> Result> {
+expr.transform(|expr| {
+if let Some(column) =
expr.as_any().downcast_ref::() {
+let logical_field =
logical_file_schema.field_with_name(column.name())?;
+let Ok(physical_field) =
physical_file_schema.field_with_name(column.name())
+else {
+if !logical_field.is_nullable() {
+return exec_err!(
+"Non-nullable column '{}' is missing from the physical
schema",
+column.name()
+);
Review Comment:
Might be useful to include some sort of file identifier here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
