Re: [PR] Per file filter evaluation [datafusion]

2025-07-15 Thread via GitHub


alamb commented on PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#issuecomment-3074155476

   Thank you @adriangb 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Per file filter evaluation [datafusion]

2025-07-15 Thread via GitHub


alamb merged PR #15057:
URL: https://github.com/apache/datafusion/pull/15057


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Per file filter evaluation [datafusion]

2025-07-15 Thread via GitHub


adriangb commented on PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#issuecomment-3073986907

   I plan on merging this once CI passes. It has been approved / reviewed and 
we need the customization of the rewriters for 
https://github.com/apache/datafusion/issues/16235#issuecomment-3067125182. I 
will follow up with PRs to:
   1. Add a no-op / disable rewriter.
   2. Restore the hooks to add a custom SchemaAdapter.
   3. Document in the Changelog how to restore the old behavior if users don't 
want to buy into the new APIs just yet.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Per file filter evaluation [datafusion]

2025-07-15 Thread via GitHub


adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2207679888


##
datafusion-examples/examples/default_column_values.rs:
##
@@ -0,0 +1,366 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use arrow::array::RecordBatch;
+use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion::common::DFSchema;
+use datafusion::common::{Result, ScalarValue};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder, 
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::expressions::{CastExpr, Column, Literal};
+use datafusion::physical_expr::schema_rewriter::{
+DefaultPhysicalExprAdapter, PhysicalExprAdapter,
+};
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{lit, SessionConfig};
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Metadata key for storing default values in field metadata
+const DEFAULT_VALUE_METADATA_KEY: &str = "example.default_value";
+
+// Example showing how to implement custom default value handling for missing 
columns
+// using field metadata and PhysicalExprAdapter.
+//
+// This example demonstrates how to:
+// 1. Store default values in field metadata using a constant key
+// 2. Create a custom PhysicalExprAdapter that reads these defaults
+// 3. Inject default values for missing columns in filter predicates
+// 4. Use the DefaultPhysicalExprAdapter as a fallback for standard schema 
adaptation
+// 5. Wrap string default values in cast expressions for proper type conversion
+//
+// Important: PhysicalExprAdapter is specifically designed for rewriting 
filter predicates
+// that get pushed down to file scans. For handling missing columns in 
projections,
+// other mechanisms in DataFusion are used (like SchemaAdapter).
+//
+// The metadata-based approach provides a flexible way to store default values 
as strings
+// and cast them to the appropriate types at query time.
+
+#[tokio::main]
+async fn main() -> Result<()> {
+println!("=== Creating example data with missing columns and default 
values ===");
+
+// Create sample data where the logical schema has more columns than the 
physical schema
+let (logical_schema, physical_schema, batch) = 
create_sample_data_with_defaults();
+
+let store = InMemory::new();
+let buf = {
+let mut buf = vec![];
+
+let props = WriterProperties::builder()
+.set_max_row_group_size(2)
+.build();
+
+let mut writer =
+ArrowWriter::try_new(&mut buf, physical_schema.clone(), 
Some(props))
+.expect("creating writer");
+
+writer.write(&batch).expect("Writing batch");
+writer.close().unwrap();
+buf
+};
+let path = Path::from("example.parquet");
+let payload = PutPayload::from_bytes(buf.into());
+store.put(&path, payload).await?;
+
+// Create a custom table provider that handles missing columns with 
defaults
+let table_provider = 
Arc::new(DefaultValueTableProvider::new(logical_schema));
+
+// Set up query execution
+let mut cfg = SessionConfig::new();
+cfg.options_mut().execution.parquet.pushdown_filters = true;
+let ctx = SessionContext::new_with_config(cfg);
+
+// Register our table
+ctx.register_table("example_table", table_provider)?;
+
+ctx.runtime_env().

Re: [PR] Per file filter evaluation [datafusion]

2025-07-15 Thread via GitHub


adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2207512528


##
datafusion-examples/examples/default_column_values.rs:
##
@@ -0,0 +1,366 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use arrow::array::RecordBatch;
+use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion::common::DFSchema;
+use datafusion::common::{Result, ScalarValue};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder, 
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::expressions::{CastExpr, Column, Literal};
+use datafusion::physical_expr::schema_rewriter::{
+DefaultPhysicalExprAdapter, PhysicalExprAdapter,
+};
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{lit, SessionConfig};
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Metadata key for storing default values in field metadata
+const DEFAULT_VALUE_METADATA_KEY: &str = "example.default_value";
+
+// Example showing how to implement custom default value handling for missing 
columns
+// using field metadata and PhysicalExprAdapter.
+//
+// This example demonstrates how to:
+// 1. Store default values in field metadata using a constant key
+// 2. Create a custom PhysicalExprAdapter that reads these defaults
+// 3. Inject default values for missing columns in filter predicates
+// 4. Use the DefaultPhysicalExprAdapter as a fallback for standard schema 
adaptation
+// 5. Wrap string default values in cast expressions for proper type conversion
+//
+// Important: PhysicalExprAdapter is specifically designed for rewriting 
filter predicates
+// that get pushed down to file scans. For handling missing columns in 
projections,
+// other mechanisms in DataFusion are used (like SchemaAdapter).
+//
+// The metadata-based approach provides a flexible way to store default values 
as strings
+// and cast them to the appropriate types at query time.
+
+#[tokio::main]
+async fn main() -> Result<()> {
+println!("=== Creating example data with missing columns and default 
values ===");
+
+// Create sample data where the logical schema has more columns than the 
physical schema
+let (logical_schema, physical_schema, batch) = 
create_sample_data_with_defaults();
+
+let store = InMemory::new();
+let buf = {
+let mut buf = vec![];
+
+let props = WriterProperties::builder()
+.set_max_row_group_size(2)
+.build();
+
+let mut writer =
+ArrowWriter::try_new(&mut buf, physical_schema.clone(), 
Some(props))
+.expect("creating writer");
+
+writer.write(&batch).expect("Writing batch");
+writer.close().unwrap();
+buf
+};
+let path = Path::from("example.parquet");
+let payload = PutPayload::from_bytes(buf.into());
+store.put(&path, payload).await?;
+
+// Create a custom table provider that handles missing columns with 
defaults
+let table_provider = 
Arc::new(DefaultValueTableProvider::new(logical_schema));
+
+// Set up query execution
+let mut cfg = SessionConfig::new();
+cfg.options_mut().execution.parquet.pushdown_filters = true;
+let ctx = SessionContext::new_with_config(cfg);
+
+// Register our table
+ctx.register_table("example_table", table_provider)?;
+
+ctx.runtime_env().

Re: [PR] Per file filter evaluation [datafusion]

2025-07-15 Thread via GitHub


adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2207520996


##
datafusion-examples/examples/default_column_values.rs:
##
@@ -0,0 +1,366 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use arrow::array::RecordBatch;
+use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion::common::DFSchema;
+use datafusion::common::{Result, ScalarValue};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder, 
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::expressions::{CastExpr, Column, Literal};
+use datafusion::physical_expr::schema_rewriter::{
+DefaultPhysicalExprAdapter, PhysicalExprAdapter,
+};
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{lit, SessionConfig};
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Metadata key for storing default values in field metadata
+const DEFAULT_VALUE_METADATA_KEY: &str = "example.default_value";
+
+// Example showing how to implement custom default value handling for missing 
columns
+// using field metadata and PhysicalExprAdapter.
+//
+// This example demonstrates how to:
+// 1. Store default values in field metadata using a constant key
+// 2. Create a custom PhysicalExprAdapter that reads these defaults
+// 3. Inject default values for missing columns in filter predicates
+// 4. Use the DefaultPhysicalExprAdapter as a fallback for standard schema 
adaptation
+// 5. Wrap string default values in cast expressions for proper type conversion
+//
+// Important: PhysicalExprAdapter is specifically designed for rewriting 
filter predicates
+// that get pushed down to file scans. For handling missing columns in 
projections,
+// other mechanisms in DataFusion are used (like SchemaAdapter).
+//
+// The metadata-based approach provides a flexible way to store default values 
as strings
+// and cast them to the appropriate types at query time.
+
+#[tokio::main]
+async fn main() -> Result<()> {
+println!("=== Creating example data with missing columns and default 
values ===");
+
+// Create sample data where the logical schema has more columns than the 
physical schema
+let (logical_schema, physical_schema, batch) = 
create_sample_data_with_defaults();
+
+let store = InMemory::new();
+let buf = {
+let mut buf = vec![];
+
+let props = WriterProperties::builder()
+.set_max_row_group_size(2)
+.build();
+
+let mut writer =
+ArrowWriter::try_new(&mut buf, physical_schema.clone(), 
Some(props))
+.expect("creating writer");
+
+writer.write(&batch).expect("Writing batch");
+writer.close().unwrap();
+buf
+};
+let path = Path::from("example.parquet");
+let payload = PutPayload::from_bytes(buf.into());
+store.put(&path, payload).await?;
+
+// Create a custom table provider that handles missing columns with 
defaults
+let table_provider = 
Arc::new(DefaultValueTableProvider::new(logical_schema));
+
+// Set up query execution
+let mut cfg = SessionConfig::new();
+cfg.options_mut().execution.parquet.pushdown_filters = true;
+let ctx = SessionContext::new_with_config(cfg);
+
+// Register our table
+ctx.register_table("example_table", table_provider)?;
+
+ctx.runtime_env().

Re: [PR] Per file filter evaluation [datafusion]

2025-07-15 Thread via GitHub


adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2207512528


##
datafusion-examples/examples/default_column_values.rs:
##
@@ -0,0 +1,366 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use arrow::array::RecordBatch;
+use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion::common::DFSchema;
+use datafusion::common::{Result, ScalarValue};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder, 
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::expressions::{CastExpr, Column, Literal};
+use datafusion::physical_expr::schema_rewriter::{
+DefaultPhysicalExprAdapter, PhysicalExprAdapter,
+};
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{lit, SessionConfig};
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Metadata key for storing default values in field metadata
+const DEFAULT_VALUE_METADATA_KEY: &str = "example.default_value";
+
+// Example showing how to implement custom default value handling for missing 
columns
+// using field metadata and PhysicalExprAdapter.
+//
+// This example demonstrates how to:
+// 1. Store default values in field metadata using a constant key
+// 2. Create a custom PhysicalExprAdapter that reads these defaults
+// 3. Inject default values for missing columns in filter predicates
+// 4. Use the DefaultPhysicalExprAdapter as a fallback for standard schema 
adaptation
+// 5. Wrap string default values in cast expressions for proper type conversion
+//
+// Important: PhysicalExprAdapter is specifically designed for rewriting 
filter predicates
+// that get pushed down to file scans. For handling missing columns in 
projections,
+// other mechanisms in DataFusion are used (like SchemaAdapter).
+//
+// The metadata-based approach provides a flexible way to store default values 
as strings
+// and cast them to the appropriate types at query time.
+
+#[tokio::main]
+async fn main() -> Result<()> {
+println!("=== Creating example data with missing columns and default 
values ===");
+
+// Create sample data where the logical schema has more columns than the 
physical schema
+let (logical_schema, physical_schema, batch) = 
create_sample_data_with_defaults();
+
+let store = InMemory::new();
+let buf = {
+let mut buf = vec![];
+
+let props = WriterProperties::builder()
+.set_max_row_group_size(2)
+.build();
+
+let mut writer =
+ArrowWriter::try_new(&mut buf, physical_schema.clone(), 
Some(props))
+.expect("creating writer");
+
+writer.write(&batch).expect("Writing batch");
+writer.close().unwrap();
+buf
+};
+let path = Path::from("example.parquet");
+let payload = PutPayload::from_bytes(buf.into());
+store.put(&path, payload).await?;
+
+// Create a custom table provider that handles missing columns with 
defaults
+let table_provider = 
Arc::new(DefaultValueTableProvider::new(logical_schema));
+
+// Set up query execution
+let mut cfg = SessionConfig::new();
+cfg.options_mut().execution.parquet.pushdown_filters = true;
+let ctx = SessionContext::new_with_config(cfg);
+
+// Register our table
+ctx.register_table("example_table", table_provider)?;
+
+ctx.runtime_env().

Re: [PR] Per file filter evaluation [datafusion]

2025-07-11 Thread via GitHub


adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2201901299


##
datafusion-examples/examples/default_column_values.rs:
##
@@ -0,0 +1,366 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use arrow::array::RecordBatch;
+use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion::common::DFSchema;
+use datafusion::common::{Result, ScalarValue};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder, 
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::expressions::{CastExpr, Column, Literal};
+use datafusion::physical_expr::schema_rewriter::{
+DefaultPhysicalExprAdapter, PhysicalExprAdapter,
+};
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{lit, SessionConfig};
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Metadata key for storing default values in field metadata
+const DEFAULT_VALUE_METADATA_KEY: &str = "example.default_value";
+
+// Example showing how to implement custom default value handling for missing 
columns
+// using field metadata and PhysicalExprAdapter.
+//
+// This example demonstrates how to:
+// 1. Store default values in field metadata using a constant key
+// 2. Create a custom PhysicalExprAdapter that reads these defaults
+// 3. Inject default values for missing columns in filter predicates
+// 4. Use the DefaultPhysicalExprAdapter as a fallback for standard schema 
adaptation
+// 5. Wrap string default values in cast expressions for proper type conversion
+//
+// Important: PhysicalExprAdapter is specifically designed for rewriting 
filter predicates
+// that get pushed down to file scans. For handling missing columns in 
projections,
+// other mechanisms in DataFusion are used (like SchemaAdapter).
+//
+// The metadata-based approach provides a flexible way to store default values 
as strings
+// and cast them to the appropriate types at query time.
+
+#[tokio::main]
+async fn main() -> Result<()> {
+println!("=== Creating example data with missing columns and default 
values ===");
+
+// Create sample data where the logical schema has more columns than the 
physical schema
+let (logical_schema, physical_schema, batch) = 
create_sample_data_with_defaults();
+
+let store = InMemory::new();
+let buf = {
+let mut buf = vec![];
+
+let props = WriterProperties::builder()
+.set_max_row_group_size(2)
+.build();
+
+let mut writer =
+ArrowWriter::try_new(&mut buf, physical_schema.clone(), 
Some(props))
+.expect("creating writer");
+
+writer.write(&batch).expect("Writing batch");
+writer.close().unwrap();
+buf
+};
+let path = Path::from("example.parquet");
+let payload = PutPayload::from_bytes(buf.into());
+store.put(&path, payload).await?;
+
+// Create a custom table provider that handles missing columns with 
defaults
+let table_provider = 
Arc::new(DefaultValueTableProvider::new(logical_schema));
+
+// Set up query execution
+let mut cfg = SessionConfig::new();
+cfg.options_mut().execution.parquet.pushdown_filters = true;
+let ctx = SessionContext::new_with_config(cfg);
+
+// Register our table
+ctx.register_table("example_table", table_provider)?;
+
+ctx.runtime_env().

Re: [PR] Per file filter evaluation [datafusion]

2025-07-11 Thread via GitHub


alamb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2201812153


##
datafusion-examples/examples/default_column_values.rs:
##
@@ -0,0 +1,366 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use arrow::array::RecordBatch;
+use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion::common::DFSchema;
+use datafusion::common::{Result, ScalarValue};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder, 
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::expressions::{CastExpr, Column, Literal};
+use datafusion::physical_expr::schema_rewriter::{
+DefaultPhysicalExprAdapter, PhysicalExprAdapter,
+};
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{lit, SessionConfig};
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Metadata key for storing default values in field metadata
+const DEFAULT_VALUE_METADATA_KEY: &str = "example.default_value";
+
+// Example showing how to implement custom default value handling for missing 
columns
+// using field metadata and PhysicalExprAdapter.
+//
+// This example demonstrates how to:
+// 1. Store default values in field metadata using a constant key
+// 2. Create a custom PhysicalExprAdapter that reads these defaults
+// 3. Inject default values for missing columns in filter predicates
+// 4. Use the DefaultPhysicalExprAdapter as a fallback for standard schema 
adaptation
+// 5. Wrap string default values in cast expressions for proper type conversion
+//
+// Important: PhysicalExprAdapter is specifically designed for rewriting 
filter predicates
+// that get pushed down to file scans. For handling missing columns in 
projections,
+// other mechanisms in DataFusion are used (like SchemaAdapter).
+//
+// The metadata-based approach provides a flexible way to store default values 
as strings
+// and cast them to the appropriate types at query time.
+
+#[tokio::main]
+async fn main() -> Result<()> {
+println!("=== Creating example data with missing columns and default 
values ===");
+
+// Create sample data where the logical schema has more columns than the 
physical schema
+let (logical_schema, physical_schema, batch) = 
create_sample_data_with_defaults();
+
+let store = InMemory::new();
+let buf = {
+let mut buf = vec![];
+
+let props = WriterProperties::builder()
+.set_max_row_group_size(2)
+.build();
+
+let mut writer =
+ArrowWriter::try_new(&mut buf, physical_schema.clone(), 
Some(props))
+.expect("creating writer");
+
+writer.write(&batch).expect("Writing batch");
+writer.close().unwrap();
+buf
+};
+let path = Path::from("example.parquet");
+let payload = PutPayload::from_bytes(buf.into());
+store.put(&path, payload).await?;
+
+// Create a custom table provider that handles missing columns with 
defaults
+let table_provider = 
Arc::new(DefaultValueTableProvider::new(logical_schema));
+
+// Set up query execution
+let mut cfg = SessionConfig::new();
+cfg.options_mut().execution.parquet.pushdown_filters = true;
+let ctx = SessionContext::new_with_config(cfg);
+
+// Register our table
+ctx.register_table("example_table", table_provider)?;
+
+ctx.runtime_env().reg

Re: [PR] Per file filter evaluation [datafusion]

2025-07-10 Thread via GitHub


adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2199354920


##
datafusion-examples/examples/default_column_values.rs:
##
@@ -0,0 +1,366 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use arrow::array::RecordBatch;
+use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion::common::DFSchema;
+use datafusion::common::{Result, ScalarValue};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder, 
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::expressions::{CastExpr, Column, Literal};
+use datafusion::physical_expr::schema_rewriter::{
+DefaultPhysicalExprAdapter, PhysicalExprAdapter,
+};
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{lit, SessionConfig};
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Metadata key for storing default values in field metadata
+const DEFAULT_VALUE_METADATA_KEY: &str = "example.default_value";
+
+// Example showing how to implement custom default value handling for missing 
columns
+// using field metadata and PhysicalExprAdapter.
+//
+// This example demonstrates how to:
+// 1. Store default values in field metadata using a constant key
+// 2. Create a custom PhysicalExprAdapter that reads these defaults
+// 3. Inject default values for missing columns in filter predicates
+// 4. Use the DefaultPhysicalExprAdapter as a fallback for standard schema 
adaptation
+// 5. Wrap string default values in cast expressions for proper type conversion
+//
+// Important: PhysicalExprAdapter is specifically designed for rewriting 
filter predicates
+// that get pushed down to file scans. For handling missing columns in 
projections,
+// other mechanisms in DataFusion are used (like SchemaAdapter).
+//
+// The metadata-based approach provides a flexible way to store default values 
as strings
+// and cast them to the appropriate types at query time.
+
+#[tokio::main]
+async fn main() -> Result<()> {
+println!("=== Creating example data with missing columns and default 
values ===");
+
+// Create sample data where the logical schema has more columns than the 
physical schema
+let (logical_schema, physical_schema, batch) = 
create_sample_data_with_defaults();
+
+let store = InMemory::new();
+let buf = {
+let mut buf = vec![];
+
+let props = WriterProperties::builder()
+.set_max_row_group_size(2)
+.build();
+
+let mut writer =
+ArrowWriter::try_new(&mut buf, physical_schema.clone(), 
Some(props))
+.expect("creating writer");
+
+writer.write(&batch).expect("Writing batch");
+writer.close().unwrap();
+buf
+};
+let path = Path::from("example.parquet");
+let payload = PutPayload::from_bytes(buf.into());
+store.put(&path, payload).await?;
+
+// Create a custom table provider that handles missing columns with 
defaults
+let table_provider = 
Arc::new(DefaultValueTableProvider::new(logical_schema));
+
+// Set up query execution
+let mut cfg = SessionConfig::new();
+cfg.options_mut().execution.parquet.pushdown_filters = true;
+let ctx = SessionContext::new_with_config(cfg);
+
+// Register our table
+ctx.register_table("example_table", table_provider)?;
+
+ctx.runtime_env().

Re: [PR] Per file filter evaluation [datafusion]

2025-07-10 Thread via GitHub


adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2198638535


##
datafusion-examples/examples/default_column_values.rs:
##
@@ -0,0 +1,366 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use arrow::array::RecordBatch;
+use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion::common::DFSchema;
+use datafusion::common::{Result, ScalarValue};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder, 
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::expressions::{CastExpr, Column, Literal};
+use datafusion::physical_expr::schema_rewriter::{
+DefaultPhysicalExprAdapter, PhysicalExprAdapter,
+};
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{lit, SessionConfig};
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Metadata key for storing default values in field metadata
+const DEFAULT_VALUE_METADATA_KEY: &str = "example.default_value";
+
+// Example showing how to implement custom default value handling for missing 
columns
+// using field metadata and PhysicalExprAdapter.
+//
+// This example demonstrates how to:
+// 1. Store default values in field metadata using a constant key
+// 2. Create a custom PhysicalExprAdapter that reads these defaults
+// 3. Inject default values for missing columns in filter predicates
+// 4. Use the DefaultPhysicalExprAdapter as a fallback for standard schema 
adaptation
+// 5. Wrap string default values in cast expressions for proper type conversion
+//
+// Important: PhysicalExprAdapter is specifically designed for rewriting 
filter predicates
+// that get pushed down to file scans. For handling missing columns in 
projections,
+// other mechanisms in DataFusion are used (like SchemaAdapter).
+//
+// The metadata-based approach provides a flexible way to store default values 
as strings
+// and cast them to the appropriate types at query time.
+
+#[tokio::main]
+async fn main() -> Result<()> {
+println!("=== Creating example data with missing columns and default 
values ===");
+
+// Create sample data where the logical schema has more columns than the 
physical schema
+let (logical_schema, physical_schema, batch) = 
create_sample_data_with_defaults();
+
+let store = InMemory::new();
+let buf = {
+let mut buf = vec![];
+
+let props = WriterProperties::builder()
+.set_max_row_group_size(2)
+.build();
+
+let mut writer =
+ArrowWriter::try_new(&mut buf, physical_schema.clone(), 
Some(props))
+.expect("creating writer");
+
+writer.write(&batch).expect("Writing batch");
+writer.close().unwrap();
+buf
+};
+let path = Path::from("example.parquet");
+let payload = PutPayload::from_bytes(buf.into());
+store.put(&path, payload).await?;
+
+// Create a custom table provider that handles missing columns with 
defaults
+let table_provider = 
Arc::new(DefaultValueTableProvider::new(logical_schema));
+
+// Set up query execution
+let mut cfg = SessionConfig::new();
+cfg.options_mut().execution.parquet.pushdown_filters = true;
+let ctx = SessionContext::new_with_config(cfg);
+
+// Register our table
+ctx.register_table("example_table", table_provider)?;
+
+ctx.runtime_env().

Re: [PR] Per file filter evaluation [datafusion]

2025-07-08 Thread via GitHub


alamb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2192262877


##
datafusion-examples/examples/variant_shredding.rs:
##
@@ -0,0 +1,408 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::array::{RecordBatch, StringArray};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{
+Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
+};
+use datafusion::common::{assert_contains, DFSchema, Result};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder, 
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{
+ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, 
Signature,
+TableProviderFilterPushDown, TableType, Volatility,
+};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::schema_rewriter::PhysicalExprSchemaRewriteHook;
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_expr::{expressions, ScalarFunctionExpr};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{lit, SessionConfig};
+use datafusion::scalar::ScalarValue;
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Example showing how to implement custom filter rewriting for variant 
shredding.

Review Comment:
   Since this example doesn't actually use variants, it uses JSON, what would 
you think about calling this example `json_shredding` instead?
   
   I can also help out with adding some more background here and diagrams -- I 
think the important things to point out are:
   1. This is a technique for speeding up queries on "semi-structured" data 
such as JSON
   2. It works by materializing ("shredding") one or more columns separately 
from the JSOn to take advantage of columnar storage
   3. However, this requires the query engine to rewrite predicates against the 
semi-structured column into predicates on shredded columns when available



##
datafusion-examples/examples/variant_shredding.rs:
##
@@ -0,0 +1,408 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::array::{RecordBatch, StringArray};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{
+Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
+};
+use datafusion::common::{assert_contains, DFSchema, Result};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder, 
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::log

Re: [PR] Per file filter evaluation [datafusion]

2025-07-07 Thread via GitHub


adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2190141364


##
datafusion/datasource-parquet/src/opener.rs:
##
@@ -233,21 +238,31 @@ impl FileOpener for ParquetOpener {
 }
 }
 
+predicate = predicate
+.map(|p| {
+if let Some(predicate_rewrite_hook) = 
predicate_rewrite_hook.as_ref()
+{
+predicate_rewrite_hook
+.rewrite(Arc::clone(&p), &physical_file_schema)
+} else {
+Ok(p)
+}
+})
+.transpose()?;

Review Comment:
   It's not clear to me if it's better if this runs before the 
`PhysicalExprSchemaRewriter` and the simplifier or somehow during it. Running 
it after doesn't really work because expressions may be simplified away or 
filled in with nulls (e.g. because it's accessing a shredded variant field that 
doesn't exist at the table schema level)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Per file filter evaluation [datafusion]

2025-07-07 Thread via GitHub


adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2190043206


##
datafusion-examples/examples/variant_shredding.rs:
##
@@ -0,0 +1,398 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::array::{RecordBatch, StringArray};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{Transformed, TreeNodeRecursion};
+use datafusion::common::{assert_contains, DFSchema, Result};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder, 
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{
+ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, 
Signature,
+TableProviderFilterPushDown, TableType, Volatility,
+};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::schema_rewriter::PhysicalExprSchemaRewriteHook;
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_expr::{expressions, ScalarFunctionExpr};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{lit, SessionConfig};
+use datafusion::scalar::ScalarValue;
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Example showing how to implement custom filter rewriting for variant 
shredding.
+//
+// In this example, we have a table with flat columns using underscore 
prefixes:
+// data: "...", _data.name: "..."

Review Comment:
   But I think that needs to be it's own initiative / PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Per file filter evaluation [datafusion]

2025-07-06 Thread via GitHub


adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2188925012


##
datafusion-examples/examples/variant_shredding.rs:
##
@@ -0,0 +1,398 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::array::{RecordBatch, StringArray};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{Transformed, TreeNodeRecursion};
+use datafusion::common::{assert_contains, DFSchema, Result};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder, 
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{
+ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, 
Signature,
+TableProviderFilterPushDown, TableType, Volatility,
+};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::schema_rewriter::PhysicalExprSchemaRewriteHook;
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_expr::{expressions, ScalarFunctionExpr};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{lit, SessionConfig};
+use datafusion::scalar::ScalarValue;
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Example showing how to implement custom filter rewriting for variant 
shredding.
+//
+// In this example, we have a table with flat columns using underscore 
prefixes:
+// data: "...", _data.name: "..."

Review Comment:
   Well as we've now found out in that issue the limitation is actually on 
DataFusion's side. I think what we need for struct fields specifically is for 
ParquetOpener to generate the right ProjectionMask based on the expressions 
having a `get_field(col, 'a')` call. So we'd have:
   1) SQL: `where name = 'Adrian' and get_field(details, 'coins') > 5`
   2) Predicate in Parquet: BinaryExpr(BinaryExpr(Col(0), Eq, lit('Adrian'), 
And, BinaryExpr(get_field(Col(1), 'coins'), Gt, lit(5)))`
   3) Rewrite predicate to `BinaryExpr(BinaryExpr(Col(0), Eq, lit('Adrian'), 
And, BinaryExpr(Col(1), Gt, lit(5)))` and make the projection mask 
`ProjectionMask::leaves(vec![0, 2])` or something like that instead of 
`ProjectionMask::roots`: 
https://github.com/apache/datafusion/blob/12c40ca9499d23a3459c5f9f7712e9f62b3443a1/datafusion/datasource-parquet/src/row_filter.rs#L126-L129



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Per file filter evaluation [datafusion]

2025-07-06 Thread via GitHub


adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2188925012


##
datafusion-examples/examples/variant_shredding.rs:
##
@@ -0,0 +1,398 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::array::{RecordBatch, StringArray};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{Transformed, TreeNodeRecursion};
+use datafusion::common::{assert_contains, DFSchema, Result};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder, 
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{
+ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, 
Signature,
+TableProviderFilterPushDown, TableType, Volatility,
+};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::schema_rewriter::PhysicalExprSchemaRewriteHook;
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_expr::{expressions, ScalarFunctionExpr};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{lit, SessionConfig};
+use datafusion::scalar::ScalarValue;
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Example showing how to implement custom filter rewriting for variant 
shredding.
+//
+// In this example, we have a table with flat columns using underscore 
prefixes:
+// data: "...", _data.name: "..."

Review Comment:
   Well as we've now found out in that issue the limitation is actually on 
DataFusion's side. I think what we need for struct fields specifically is for 
ParquetOpener to generate the right ProjectionMask based on the expressions 
having a `get_field(col, 'a')` call. So we'd have:
   1) SQL: `where name = 'Adrian' and get_field(details, 'coins') > 5`
   2) Predicate in Parquet: BinaryExpr(BinaryExpr(Col(0), Eq, lit('Adrian'), 
And, BinaryExpr(get_field(Col(1), 'coins'), Gt, lit(5)))`
   3) Rewrite predicate to BinaryExpr(BinaryExpr(Col(0), Eq, lit('Adrian'), 
And, BinaryExpr(Col(1), Gt, lit(5)))` and make the projection mask 
`ProjectionMask::leaves(vec![0, 2])` or something like that instead of 
`ProjectionMask::roots`: 
https://github.com/apache/datafusion/blob/12c40ca9499d23a3459c5f9f7712e9f62b3443a1/datafusion/datasource-parquet/src/row_filter.rs#L126-L129



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Per file filter evaluation [datafusion]

2025-07-02 Thread via GitHub


alamb commented on PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#issuecomment-3029071696

   Sorry @adriangb -- I lost track of this PR -- I will put iy on my review 
queue for tomorrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Per file filter evaluation [datafusion]

2025-07-01 Thread via GitHub


adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2178912618


##
datafusion/datasource-parquet/src/opener.rs:
##
@@ -252,16 +257,20 @@ impl FileOpener for ParquetOpener {
 // This evaluates missing columns and inserts any necessary casts.
 let predicate = predicate
 .map(|p| {
-PhysicalExprSchemaRewriter::new(
+let mut rewriter = PhysicalExprSchemaRewriter::new(
 &physical_file_schema,
 &logical_file_schema,
 )
 .with_partition_columns(
 partition_fields.to_vec(),
 file.partition_values,
-)
-.rewrite(p)
-.map_err(ArrowError::from)
+);
+if let Some(predicate_rewrite_hook) = 
predicate_rewrite_hook.as_ref()
+{
+rewriter = rewriter
+
.with_rewrite_hook(Arc::clone(predicate_rewrite_hook));
+};
+rewriter.rewrite(p).map_err(ArrowError::from)

Review Comment:
   I'm thinking maybe these should be decoupled and it just becomes two 
different rewrite passes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Per file filter evaluation [datafusion]

2025-05-16 Thread via GitHub


adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2093694833


##
datafusion/datasource-parquet/src/opener.rs:
##
@@ -55,8 +57,8 @@ pub(super) struct ParquetOpener {
 pub limit: Option,
 /// Optional predicate to apply during the scan
 pub predicate: Option>,
-/// Schema of the output table
-pub table_schema: SchemaRef,
+/// Schema of the file as far as the rest of the system is concerned.
+pub logical_file_schema: SchemaRef,

Review Comment:
   I think this rename is worth it - there's been constant confusion even 
amongst maintainers about this. And this is only public for internal use.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Per file filter evaluation [datafusion]

2025-04-15 Thread via GitHub


jayzhan211 commented on PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#issuecomment-2801563360

   > Is this the right API, or should some of the information on how to 
optimize / adapt to a specific file schema live in 
PhysicalExpr::with_file_schema or something like that?
   
   rewrite with file schema is specialized to filter, if you add 
`PhysicalExpr::with_file_schema` that means you may expect other kind of 
rewrite for other `PhysicalExpr`, I don't think we need this so far.
   
   > Can we use this mechanism both for variant_get(col, 'key') -> 
col.typed_value.key as well as cast(a, i64) = 100 -> cast(cast(a,i32),i64) = 
100 -> a = 100 or is that mixing too many things into the same API? 
   
   Makes sense to me if we have many rules inside the 
`filter_rewrite_with_file_schema` logic as long as the rewrite is leveraged on 
the provided file schema.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Per file filter evaluation [datafusion]

2025-04-14 Thread via GitHub


adriangb commented on PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#issuecomment-2801415154

   > This is likely only applied to parquet filter so we can rewrite the filter 
when we know the filter + file_schema + table_schema (probably 
`build_row_filter`). We don't need optimize rule or trait method unless this 
rule could be applied more generally.
   
   Yes agreed, that's basically what's in this PR currently: a custom trait to 
implement an optimizer pass with all of that information available.
   
   My questions are:
   1. Is this the right API, or should some of the information on how to 
optimize / adapt to a specific file schema live in 
`PhysicalExpr::with_file_schema` or something like that?
   2. Can we use this mechanism both for `variant_get(col, 'key')` ->  
`col.typed_value.key` as well as `cast(a, i64) = 100` -> `cast(cast(a,i32),i64) 
= 100` -> `a = 100` or is that mixing too many things into the same API?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Per file filter evaluation [datafusion]

2025-04-14 Thread via GitHub


jayzhan211 commented on PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#issuecomment-2801368982

   > And if that all makes sense... how do we do those optimizations? Is it 
something like an optimizer that has to downcast match on the expressions, or 
do we add methods to PhysicalExpr for each expression to describe how it 
handles this behavior?
   
   This is likely only applied to parquet filter so we can rewrite the filter 
when we know the filter + file_schema + table_schema (probably 
`build_row_filter`). We don't need optimize rule or trait method unless this 
rule could be applied more generally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Per file filter evaluation [datafusion]

2025-04-13 Thread via GitHub


adriangb commented on PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#issuecomment-2800379042

   > Another question is, isn't the filter created based on table schema? And 
then the batch is read as file schema and casted to table schema and is 
evaluated by filter.
   
   Yes this is exactly the case.
   
   What we could do is rewrite the filter based on file schema. Assume we have 
`cast(a, i64) = 100`, `a` is i32 in table schema and i64 in file schema. We 
rewrite it to `cast(cast(a,i32),i64) = 100` and then optimize it with `a = 100`.
   
   Yes that is exactly what I am proposing above, what method it happens by is 
not that important to me.
   
   The other point is if we can use this same mechanism to handle shredding for 
the variant type. In other words, can we "optimize" `variant_get(col, 'key')` 
to `col.typed_value.key` if we know from the file schema that `key` is shredded 
for this specific file.
   
   And if that all makes sense... how do we do those optimizations? Is it 
something like an optimizer that has to downcast match on the expressions, or 
do we add methods to PhysicalExpr for each expression to describe how it 
handles this behavior?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Per file filter evaluation [datafusion]

2025-04-13 Thread via GitHub


jayzhan211 commented on PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#issuecomment-2800373423

   > PhysicalExpr::with_schema
   
   This method is too general and it is unclear what we need to do with the 
provided schema for each PhysicalExpr, it is not a good idea.
   
   > I suspect the hard bit with this approach will be edge cases: what if a 
filter cannot adapt itself to the file schema, but we could cast the column to 
make it work? I'm thinking something like a UDF that only accepts Utf8 but the 
the file produces Utf8View
   
   I think it is unavoidable we need to cast the columns to be able to evaluate 
the filter.
   
   Another question is, isn't the filter created based on table schema? And 
then the batch is read as file schema and casted to table schema and is 
evaluated by filter. What we could do is rewrite the filter based on file 
schema. Assume we have `cast(a, i64) = 100`, `a` is i32 in table schema and i64 
in file schema. We rewrite it to `cast(cast(a,i32),i64) = 100` and then 
optimize it with `a = 100`. In your example where udf only accepts utf8, we 
know that no optimization we could do so we just end up additional casting from 
file schema to table schema.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Per file filter evaluation [datafusion]

2025-04-13 Thread via GitHub


adriangb commented on PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#issuecomment-282196

   I would like to resume this work.
   
   Some thoughts should the rewrite happen via a new trait as I'm currently 
doing, or should we add a method `PhysicalExpr::with_schema`?
If we add `with_schema` what schema do we pass it? The actual file schema? 
There's something to be said for that: it could rewrite filters to case the 
literals / filters instead of casting the columns/arrays [as is currently 
done](https://github.com/pydantic/datafusion/blob/0b01fdf7f02f9097c319204058576f420b9790b4/datafusion/datasource-parquet/src/row_filter.rs#L146),
 which should be cheaper. I expect that any time it was okay to cast the data 
it was also okay to cast the predicate itself. It could also absorb the work of 
[reassign_predicate_columns](https://github.com/pydantic/datafusion/blob/0b01fdf7f02f9097c319204058576f420b9790b4/datafusion/datasource-parquet/src/row_filter.rs#L123)
 (we implement it for `Column` such that if it's index doesn't match but 
another one does it swaps).
   
   I suspect the hard bit with this approach will be edge cases: what if a 
filter _cannot_ adapt itself to the file schema, but we  could cast the column 
to make it work? I'm thinking something like a UDF that only accepts `Utf8` but 
the the file produces `Utf8View` 🤔 
   
   
   I think @jayzhan-synnada proposed something similar in 
https://github.com/apache/datafusion/pull/15685/files#diff-2b3f5563d9441d3303b57e58e804ab07a10d198973eed20e7751b5a20b955e42.
   
   @alamb any thoughts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Per file filter evaluation [datafusion]

2025-03-15 Thread via GitHub


adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r1988141252


##
datafusion/datasource-parquet/src/source.rs:
##
@@ -559,24 +556,8 @@ impl FileSource for ParquetSource {
 .predicate()
 .map(|p| format!(", predicate={p}"))
 .unwrap_or_default();
-let pruning_predicate_string = self
-.pruning_predicate()
-.map(|pre| {
-let mut guarantees = pre
-.literal_guarantees()
-.iter()
-.map(|item| format!("{}", item))
-.collect_vec();
-guarantees.sort();
-format!(
-", pruning_predicate={}, required_guarantees=[{}]",
-pre.predicate_expr(),
-guarantees.join(", ")
-)
-})
-.unwrap_or_default();
 
-write!(f, "{}{}", predicate_string, pruning_predicate_string)

Review Comment:
   This is what's causing tests to fail. Tests assert against the formatting of 
a ParquetSource and accessing it's pruning predicate method. I'm not sure if we 
should rewrite the tests and somehow make the generated predicates accessible, 
bin them, etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Per file filter evaluation [datafusion]

2025-03-15 Thread via GitHub


alamb commented on PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#issuecomment-2726544602

   I will try and give this a look over the next few days


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Per file filter evaluation [datafusion]

2025-03-13 Thread via GitHub


adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r1994289815


##
datafusion/datasource-parquet/src/source.rs:
##
@@ -394,6 +389,26 @@ impl ParquetSource {
 self
 }
 
+/// Return the optional filter expression rewriter factory
+pub fn filter_expression_rewriter_factory(
+&self,
+) -> Option<&Arc> {
+self.filter_expression_rewriter.as_ref()
+}
+
+/// Set optional filter expression rewriter.
+///
+/// [`FileExpressionRewriter`] allows users to specify how filter

Review Comment:
   ```suggestion
   /// [`FileExpressionRewriter`] allows specifying how filter
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Per file filter evaluation [datafusion]

2025-03-13 Thread via GitHub


adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r1994289444


##
datafusion/datasource/src/file_expr_rewriter.rs:
##
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_physical_plan::PhysicalExpr;
+
+/// Rewrite an expressions to take into account this file's particular schema.
+/// This can be used to evaluate expressions against shredded variant columns 
or columns that pre-compute expressions (e.g. `day(timestamp)`).
+pub trait FileExpressionRewriter: Debug + Send + Sync {
+/// Rewrite an an expression in the context of a file schema.
+fn rewrite(
+&self,
+file_schema: SchemaRef,
+expr: Arc,
+) -> Result>;
+}

Review Comment:
   Note: if users need the `table_schema` they can bind that inside of 
`TableProvider::scan`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Per file filter evaluation [datafusion]

2025-03-12 Thread via GitHub


adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r1992124608


##
datafusion/datasource-parquet/src/opener.rs:
##
@@ -83,10 +87,23 @@ pub(super) struct ParquetOpener {
 pub enable_bloom_filter: bool,
 /// Schema adapter factory
 pub schema_adapter_factory: Arc,
+/// Filter expression rewriter factory
+pub filter_expression_rewriter: Option>,
 }
 
 impl FileOpener for ParquetOpener {
 fn open(&self, file_meta: FileMeta) -> Result {
+// Note about schemas: we are actually dealing with _4_ different 
schemas here:
+// - The table schema as defined by the TableProvider. This is what 
the user sees, what they get when they `SELECT * FROM table`, etc.
+// - The "virtual" file schema: this is the table schema minus any 
hive partition columns. This is what the file schema is coerced to.
+// - The physical file schema: this is the schema as defined by the 
parquet file. This is what the parquet file actually contains.
+// - The filter schema: a hybrid of the virtual file schema and the 
physical file schema.
+//   If a filter is rewritten to reference columns that are in the 
physical file schema but not the virtual file schema, we need to add those 
columns to the filter schema so that the filter can be evaluated.
+//   This schema is generated by taking any columns from the virtual 
file schema that are referenced by the filter and adding any columns from the 
physical file schema that are referenced by the filter but not in the virtual 
file schema.
+//   Columns from the virtual file schema are added in the order they 
appear in the virtual file schema.
+//   The columns from the physical file schema are always added to the 
end of the schema, in the order they appear in the physical file schema.
+//
+// I think it might be wise to do some renaming of parameters where 
possible, e.g. rename `file_schema` to `table_schema_without_partition_columns` 
and `physical_file_schema` or something like that.

Review Comment:
   This is an interesting bit to ponder upon



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Per file filter evaluation [datafusion]

2025-03-12 Thread via GitHub


adriangb commented on PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#issuecomment-2718504764

   Okay folks sorry for the churn, I thought this was in a better state than it 
ended up being.
   
   I've now reworked it to minimize the diff and make sure all existing tests 
pass. I'm going to add tests for the new functionality now to compliment the 
example.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Per file filter evaluation [datafusion]

2025-03-11 Thread via GitHub


adriangb commented on PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#issuecomment-2715766181

   Okay I think I can answer my own question: 
https://github.com/pydantic/datafusion/blob/38356998059a2d08113401ea8111f238899ab0b8/datafusion/core/src/datasource/listing/table.rs#L961-L995
   
   Based on this it seems like it's safe to mark filters as exact if they are 
getting pushed down 😄 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Per file filter evaluation [datafusion]

2025-03-11 Thread via GitHub


adriangb commented on PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#issuecomment-2715592559

   The main issue I've found with this approach is marking filters as `Exact` 
or `Inexact`.
   In particular unless you mark them as `Exact` DataFusion will still need to 
pull the possibly large unshredded data to re-apply filters in a `FilterExec`. 
This doesn't completely kill performance because if the filter is selective 
there is less data to re-filter, but the worst case scenario is possibly worse 
than not having this feature at all. But I feel like this is a consequence of 
filter pushdown in general? Ignoring this change, if I have a `TableProvider` 
that returns a `DataSourceExec` and I have filter pushdown enabled, should I be 
marking _all_ of my filters as `Exact`? That seems dangerous given that it's 
not documented anywhere that filter pushdown supports all filters that 
`FilterExec` does and things like 
https://github.com/apache/datafusion/blob/9382add72b929c553ca4976d1423d8ebbc80889d/datafusion/datasource-parquet/src/row_filter.rs#L333-L336.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Per file filter evaluation [datafusion]

2025-03-11 Thread via GitHub


adriangb commented on PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#issuecomment-2712077738

   @alamb I think this is ready for a first round of review when you have a 
chance!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Per file filter evaluation [datafusion]

2025-03-10 Thread via GitHub


adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r1988140038


##
datafusion/datasource-parquet/src/opener.rs:
##
@@ -111,18 +109,18 @@ impl FileOpener for ParquetOpener {
 .schema_adapter_factory
 .create(projected_schema, Arc::clone(&self.table_schema));
 let predicate = self.predicate.clone();
-let pruning_predicate = self.pruning_predicate.clone();
-let page_pruning_predicate = self.page_pruning_predicate.clone();
 let table_schema = Arc::clone(&self.table_schema);
+let filter_expression_rewriter_factory =
+self.filter_expression_rewriter_factory.clone();
 let reorder_predicates = self.reorder_filters;
 let pushdown_filters = self.pushdown_filters;
-let enable_page_index = should_enable_page_index(
-self.enable_page_index,
-&self.page_pruning_predicate,
-);
+let enable_page_index = self.enable_page_index && 
self.predicate.is_some();

Review Comment:
   We need to decide upfront if we need the page index or not.
   
   Previously the pruning predicate got calculated from the "file schema" that 
users pass in (which is obviously _not_ the schema of the physical file we are 
reading, it is expected to be the table schema - partition columns; I think 
that should be renamed / reworked but that's for another day). This seems wrong 
to me: I think it should be using the physical file schema since that's what 
the rg/page stats it will be loading match. So *I think* moving it here 
actually improves some edge cases. _But_ we now have to start loading the file 
before we know if we have any page index pruning predicate, so we may load the 
page index then realize we don't have any applicable filters. That should only 
happen if we have a predicate and can't build a page pruning predicate from the 
transformed predicate, I don't think that will be all that common.



##
datafusion-examples/examples/struct_field_rewrite.rs:
##
@@ -0,0 +1,353 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray, 
StructArray};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow_schema::Fields;
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{
+Transformed, TransformedResult, TreeNode, TreeNodeRewriter,
+};
+use datafusion::common::{DFSchema, Result};
+use datafusion::datasource::file_expr_rewriter::FileExpressionRewriter;
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_expr::{expressions, ScalarFunctionExpr};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::lit;
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Example showing how to implement custom filter rewriting for struct fields.
+//
+// In this example, we have a table with a struct column like:
+// struct_col: {"a": 1, "b": "foo"}
+//
+// Our custom TableProvider will use a FilterExpressionRewriter to rewrite
+// expressions like `struct_col['a'] = 10` to use a flattened column name
+// `_struct_col.a` if it exists in the file schema.
+#[tokio::main]
+async fn main() -> Result<()> {
+println!("=== Creating example data with structs and flattened fields 
===");
+println!("NOTE: This example demonstrates filter expression rewriting for 
struct field access.");
+println!("  We deliberately create different values in the struct 

Re: [PR] Per file filter evaluation [datafusion]

2025-03-10 Thread via GitHub


adriangb commented on PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#issuecomment-2712042344

   The example is now working and even does stats pruning of shredded columns 🚀 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Per file filter evaluation [datafusion]

2025-03-07 Thread via GitHub


adriangb commented on PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#issuecomment-2707505442

   Ok the example is now working and I think the overall approach seems good 
but probably needs a lot of tweaking.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] Per file filter evaluation [datafusion]

2025-03-06 Thread via GitHub


adriangb commented on PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#issuecomment-2705670873

   The example is not working yet. It gets `Error: 
Shared(ArrowError(ExternalError(External(ComputeError("Error evaluating filter 
predicate: Internal(\"PhysicalExpr Column references column '_user_info.age' at 
index 0 (zero-based) but input schema only has 0 columns: []\")"))), None))`. I 
think some work will be needed with how the rewrites interact with projections 
for the filters.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]