Re: [PR] Per file filter evaluation [datafusion]
alamb commented on PR #15057: URL: https://github.com/apache/datafusion/pull/15057#issuecomment-3074155476 Thank you @adriangb -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Per file filter evaluation [datafusion]
alamb merged PR #15057: URL: https://github.com/apache/datafusion/pull/15057 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Per file filter evaluation [datafusion]
adriangb commented on PR #15057: URL: https://github.com/apache/datafusion/pull/15057#issuecomment-3073986907 I plan on merging this once CI passes. It has been approved / reviewed and we need the customization of the rewriters for https://github.com/apache/datafusion/issues/16235#issuecomment-3067125182. I will follow up with PRs to: 1. Add a no-op / disable rewriter. 2. Restore the hooks to add a custom SchemaAdapter. 3. Document in the Changelog how to restore the old behavior if users don't want to buy into the new APIs just yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Per file filter evaluation [datafusion]
adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2207679888
##
datafusion-examples/examples/default_column_values.rs:
##
@@ -0,0 +1,366 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use arrow::array::RecordBatch;
+use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion::common::DFSchema;
+use datafusion::common::{Result, ScalarValue};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder,
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::expressions::{CastExpr, Column, Literal};
+use datafusion::physical_expr::schema_rewriter::{
+DefaultPhysicalExprAdapter, PhysicalExprAdapter,
+};
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{lit, SessionConfig};
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Metadata key for storing default values in field metadata
+const DEFAULT_VALUE_METADATA_KEY: &str = "example.default_value";
+
+// Example showing how to implement custom default value handling for missing
columns
+// using field metadata and PhysicalExprAdapter.
+//
+// This example demonstrates how to:
+// 1. Store default values in field metadata using a constant key
+// 2. Create a custom PhysicalExprAdapter that reads these defaults
+// 3. Inject default values for missing columns in filter predicates
+// 4. Use the DefaultPhysicalExprAdapter as a fallback for standard schema
adaptation
+// 5. Wrap string default values in cast expressions for proper type conversion
+//
+// Important: PhysicalExprAdapter is specifically designed for rewriting
filter predicates
+// that get pushed down to file scans. For handling missing columns in
projections,
+// other mechanisms in DataFusion are used (like SchemaAdapter).
+//
+// The metadata-based approach provides a flexible way to store default values
as strings
+// and cast them to the appropriate types at query time.
+
+#[tokio::main]
+async fn main() -> Result<()> {
+println!("=== Creating example data with missing columns and default
values ===");
+
+// Create sample data where the logical schema has more columns than the
physical schema
+let (logical_schema, physical_schema, batch) =
create_sample_data_with_defaults();
+
+let store = InMemory::new();
+let buf = {
+let mut buf = vec![];
+
+let props = WriterProperties::builder()
+.set_max_row_group_size(2)
+.build();
+
+let mut writer =
+ArrowWriter::try_new(&mut buf, physical_schema.clone(),
Some(props))
+.expect("creating writer");
+
+writer.write(&batch).expect("Writing batch");
+writer.close().unwrap();
+buf
+};
+let path = Path::from("example.parquet");
+let payload = PutPayload::from_bytes(buf.into());
+store.put(&path, payload).await?;
+
+// Create a custom table provider that handles missing columns with
defaults
+let table_provider =
Arc::new(DefaultValueTableProvider::new(logical_schema));
+
+// Set up query execution
+let mut cfg = SessionConfig::new();
+cfg.options_mut().execution.parquet.pushdown_filters = true;
+let ctx = SessionContext::new_with_config(cfg);
+
+// Register our table
+ctx.register_table("example_table", table_provider)?;
+
+ctx.runtime_env().
Re: [PR] Per file filter evaluation [datafusion]
adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2207512528
##
datafusion-examples/examples/default_column_values.rs:
##
@@ -0,0 +1,366 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use arrow::array::RecordBatch;
+use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion::common::DFSchema;
+use datafusion::common::{Result, ScalarValue};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder,
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::expressions::{CastExpr, Column, Literal};
+use datafusion::physical_expr::schema_rewriter::{
+DefaultPhysicalExprAdapter, PhysicalExprAdapter,
+};
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{lit, SessionConfig};
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Metadata key for storing default values in field metadata
+const DEFAULT_VALUE_METADATA_KEY: &str = "example.default_value";
+
+// Example showing how to implement custom default value handling for missing
columns
+// using field metadata and PhysicalExprAdapter.
+//
+// This example demonstrates how to:
+// 1. Store default values in field metadata using a constant key
+// 2. Create a custom PhysicalExprAdapter that reads these defaults
+// 3. Inject default values for missing columns in filter predicates
+// 4. Use the DefaultPhysicalExprAdapter as a fallback for standard schema
adaptation
+// 5. Wrap string default values in cast expressions for proper type conversion
+//
+// Important: PhysicalExprAdapter is specifically designed for rewriting
filter predicates
+// that get pushed down to file scans. For handling missing columns in
projections,
+// other mechanisms in DataFusion are used (like SchemaAdapter).
+//
+// The metadata-based approach provides a flexible way to store default values
as strings
+// and cast them to the appropriate types at query time.
+
+#[tokio::main]
+async fn main() -> Result<()> {
+println!("=== Creating example data with missing columns and default
values ===");
+
+// Create sample data where the logical schema has more columns than the
physical schema
+let (logical_schema, physical_schema, batch) =
create_sample_data_with_defaults();
+
+let store = InMemory::new();
+let buf = {
+let mut buf = vec![];
+
+let props = WriterProperties::builder()
+.set_max_row_group_size(2)
+.build();
+
+let mut writer =
+ArrowWriter::try_new(&mut buf, physical_schema.clone(),
Some(props))
+.expect("creating writer");
+
+writer.write(&batch).expect("Writing batch");
+writer.close().unwrap();
+buf
+};
+let path = Path::from("example.parquet");
+let payload = PutPayload::from_bytes(buf.into());
+store.put(&path, payload).await?;
+
+// Create a custom table provider that handles missing columns with
defaults
+let table_provider =
Arc::new(DefaultValueTableProvider::new(logical_schema));
+
+// Set up query execution
+let mut cfg = SessionConfig::new();
+cfg.options_mut().execution.parquet.pushdown_filters = true;
+let ctx = SessionContext::new_with_config(cfg);
+
+// Register our table
+ctx.register_table("example_table", table_provider)?;
+
+ctx.runtime_env().
Re: [PR] Per file filter evaluation [datafusion]
adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2207520996
##
datafusion-examples/examples/default_column_values.rs:
##
@@ -0,0 +1,366 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use arrow::array::RecordBatch;
+use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion::common::DFSchema;
+use datafusion::common::{Result, ScalarValue};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder,
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::expressions::{CastExpr, Column, Literal};
+use datafusion::physical_expr::schema_rewriter::{
+DefaultPhysicalExprAdapter, PhysicalExprAdapter,
+};
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{lit, SessionConfig};
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Metadata key for storing default values in field metadata
+const DEFAULT_VALUE_METADATA_KEY: &str = "example.default_value";
+
+// Example showing how to implement custom default value handling for missing
columns
+// using field metadata and PhysicalExprAdapter.
+//
+// This example demonstrates how to:
+// 1. Store default values in field metadata using a constant key
+// 2. Create a custom PhysicalExprAdapter that reads these defaults
+// 3. Inject default values for missing columns in filter predicates
+// 4. Use the DefaultPhysicalExprAdapter as a fallback for standard schema
adaptation
+// 5. Wrap string default values in cast expressions for proper type conversion
+//
+// Important: PhysicalExprAdapter is specifically designed for rewriting
filter predicates
+// that get pushed down to file scans. For handling missing columns in
projections,
+// other mechanisms in DataFusion are used (like SchemaAdapter).
+//
+// The metadata-based approach provides a flexible way to store default values
as strings
+// and cast them to the appropriate types at query time.
+
+#[tokio::main]
+async fn main() -> Result<()> {
+println!("=== Creating example data with missing columns and default
values ===");
+
+// Create sample data where the logical schema has more columns than the
physical schema
+let (logical_schema, physical_schema, batch) =
create_sample_data_with_defaults();
+
+let store = InMemory::new();
+let buf = {
+let mut buf = vec![];
+
+let props = WriterProperties::builder()
+.set_max_row_group_size(2)
+.build();
+
+let mut writer =
+ArrowWriter::try_new(&mut buf, physical_schema.clone(),
Some(props))
+.expect("creating writer");
+
+writer.write(&batch).expect("Writing batch");
+writer.close().unwrap();
+buf
+};
+let path = Path::from("example.parquet");
+let payload = PutPayload::from_bytes(buf.into());
+store.put(&path, payload).await?;
+
+// Create a custom table provider that handles missing columns with
defaults
+let table_provider =
Arc::new(DefaultValueTableProvider::new(logical_schema));
+
+// Set up query execution
+let mut cfg = SessionConfig::new();
+cfg.options_mut().execution.parquet.pushdown_filters = true;
+let ctx = SessionContext::new_with_config(cfg);
+
+// Register our table
+ctx.register_table("example_table", table_provider)?;
+
+ctx.runtime_env().
Re: [PR] Per file filter evaluation [datafusion]
adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2207512528
##
datafusion-examples/examples/default_column_values.rs:
##
@@ -0,0 +1,366 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use arrow::array::RecordBatch;
+use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion::common::DFSchema;
+use datafusion::common::{Result, ScalarValue};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder,
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::expressions::{CastExpr, Column, Literal};
+use datafusion::physical_expr::schema_rewriter::{
+DefaultPhysicalExprAdapter, PhysicalExprAdapter,
+};
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{lit, SessionConfig};
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Metadata key for storing default values in field metadata
+const DEFAULT_VALUE_METADATA_KEY: &str = "example.default_value";
+
+// Example showing how to implement custom default value handling for missing
columns
+// using field metadata and PhysicalExprAdapter.
+//
+// This example demonstrates how to:
+// 1. Store default values in field metadata using a constant key
+// 2. Create a custom PhysicalExprAdapter that reads these defaults
+// 3. Inject default values for missing columns in filter predicates
+// 4. Use the DefaultPhysicalExprAdapter as a fallback for standard schema
adaptation
+// 5. Wrap string default values in cast expressions for proper type conversion
+//
+// Important: PhysicalExprAdapter is specifically designed for rewriting
filter predicates
+// that get pushed down to file scans. For handling missing columns in
projections,
+// other mechanisms in DataFusion are used (like SchemaAdapter).
+//
+// The metadata-based approach provides a flexible way to store default values
as strings
+// and cast them to the appropriate types at query time.
+
+#[tokio::main]
+async fn main() -> Result<()> {
+println!("=== Creating example data with missing columns and default
values ===");
+
+// Create sample data where the logical schema has more columns than the
physical schema
+let (logical_schema, physical_schema, batch) =
create_sample_data_with_defaults();
+
+let store = InMemory::new();
+let buf = {
+let mut buf = vec![];
+
+let props = WriterProperties::builder()
+.set_max_row_group_size(2)
+.build();
+
+let mut writer =
+ArrowWriter::try_new(&mut buf, physical_schema.clone(),
Some(props))
+.expect("creating writer");
+
+writer.write(&batch).expect("Writing batch");
+writer.close().unwrap();
+buf
+};
+let path = Path::from("example.parquet");
+let payload = PutPayload::from_bytes(buf.into());
+store.put(&path, payload).await?;
+
+// Create a custom table provider that handles missing columns with
defaults
+let table_provider =
Arc::new(DefaultValueTableProvider::new(logical_schema));
+
+// Set up query execution
+let mut cfg = SessionConfig::new();
+cfg.options_mut().execution.parquet.pushdown_filters = true;
+let ctx = SessionContext::new_with_config(cfg);
+
+// Register our table
+ctx.register_table("example_table", table_provider)?;
+
+ctx.runtime_env().
Re: [PR] Per file filter evaluation [datafusion]
adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2201901299
##
datafusion-examples/examples/default_column_values.rs:
##
@@ -0,0 +1,366 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use arrow::array::RecordBatch;
+use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion::common::DFSchema;
+use datafusion::common::{Result, ScalarValue};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder,
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::expressions::{CastExpr, Column, Literal};
+use datafusion::physical_expr::schema_rewriter::{
+DefaultPhysicalExprAdapter, PhysicalExprAdapter,
+};
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{lit, SessionConfig};
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Metadata key for storing default values in field metadata
+const DEFAULT_VALUE_METADATA_KEY: &str = "example.default_value";
+
+// Example showing how to implement custom default value handling for missing
columns
+// using field metadata and PhysicalExprAdapter.
+//
+// This example demonstrates how to:
+// 1. Store default values in field metadata using a constant key
+// 2. Create a custom PhysicalExprAdapter that reads these defaults
+// 3. Inject default values for missing columns in filter predicates
+// 4. Use the DefaultPhysicalExprAdapter as a fallback for standard schema
adaptation
+// 5. Wrap string default values in cast expressions for proper type conversion
+//
+// Important: PhysicalExprAdapter is specifically designed for rewriting
filter predicates
+// that get pushed down to file scans. For handling missing columns in
projections,
+// other mechanisms in DataFusion are used (like SchemaAdapter).
+//
+// The metadata-based approach provides a flexible way to store default values
as strings
+// and cast them to the appropriate types at query time.
+
+#[tokio::main]
+async fn main() -> Result<()> {
+println!("=== Creating example data with missing columns and default
values ===");
+
+// Create sample data where the logical schema has more columns than the
physical schema
+let (logical_schema, physical_schema, batch) =
create_sample_data_with_defaults();
+
+let store = InMemory::new();
+let buf = {
+let mut buf = vec![];
+
+let props = WriterProperties::builder()
+.set_max_row_group_size(2)
+.build();
+
+let mut writer =
+ArrowWriter::try_new(&mut buf, physical_schema.clone(),
Some(props))
+.expect("creating writer");
+
+writer.write(&batch).expect("Writing batch");
+writer.close().unwrap();
+buf
+};
+let path = Path::from("example.parquet");
+let payload = PutPayload::from_bytes(buf.into());
+store.put(&path, payload).await?;
+
+// Create a custom table provider that handles missing columns with
defaults
+let table_provider =
Arc::new(DefaultValueTableProvider::new(logical_schema));
+
+// Set up query execution
+let mut cfg = SessionConfig::new();
+cfg.options_mut().execution.parquet.pushdown_filters = true;
+let ctx = SessionContext::new_with_config(cfg);
+
+// Register our table
+ctx.register_table("example_table", table_provider)?;
+
+ctx.runtime_env().
Re: [PR] Per file filter evaluation [datafusion]
alamb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2201812153
##
datafusion-examples/examples/default_column_values.rs:
##
@@ -0,0 +1,366 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use arrow::array::RecordBatch;
+use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion::common::DFSchema;
+use datafusion::common::{Result, ScalarValue};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder,
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::expressions::{CastExpr, Column, Literal};
+use datafusion::physical_expr::schema_rewriter::{
+DefaultPhysicalExprAdapter, PhysicalExprAdapter,
+};
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{lit, SessionConfig};
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Metadata key for storing default values in field metadata
+const DEFAULT_VALUE_METADATA_KEY: &str = "example.default_value";
+
+// Example showing how to implement custom default value handling for missing
columns
+// using field metadata and PhysicalExprAdapter.
+//
+// This example demonstrates how to:
+// 1. Store default values in field metadata using a constant key
+// 2. Create a custom PhysicalExprAdapter that reads these defaults
+// 3. Inject default values for missing columns in filter predicates
+// 4. Use the DefaultPhysicalExprAdapter as a fallback for standard schema
adaptation
+// 5. Wrap string default values in cast expressions for proper type conversion
+//
+// Important: PhysicalExprAdapter is specifically designed for rewriting
filter predicates
+// that get pushed down to file scans. For handling missing columns in
projections,
+// other mechanisms in DataFusion are used (like SchemaAdapter).
+//
+// The metadata-based approach provides a flexible way to store default values
as strings
+// and cast them to the appropriate types at query time.
+
+#[tokio::main]
+async fn main() -> Result<()> {
+println!("=== Creating example data with missing columns and default
values ===");
+
+// Create sample data where the logical schema has more columns than the
physical schema
+let (logical_schema, physical_schema, batch) =
create_sample_data_with_defaults();
+
+let store = InMemory::new();
+let buf = {
+let mut buf = vec![];
+
+let props = WriterProperties::builder()
+.set_max_row_group_size(2)
+.build();
+
+let mut writer =
+ArrowWriter::try_new(&mut buf, physical_schema.clone(),
Some(props))
+.expect("creating writer");
+
+writer.write(&batch).expect("Writing batch");
+writer.close().unwrap();
+buf
+};
+let path = Path::from("example.parquet");
+let payload = PutPayload::from_bytes(buf.into());
+store.put(&path, payload).await?;
+
+// Create a custom table provider that handles missing columns with
defaults
+let table_provider =
Arc::new(DefaultValueTableProvider::new(logical_schema));
+
+// Set up query execution
+let mut cfg = SessionConfig::new();
+cfg.options_mut().execution.parquet.pushdown_filters = true;
+let ctx = SessionContext::new_with_config(cfg);
+
+// Register our table
+ctx.register_table("example_table", table_provider)?;
+
+ctx.runtime_env().reg
Re: [PR] Per file filter evaluation [datafusion]
adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2199354920
##
datafusion-examples/examples/default_column_values.rs:
##
@@ -0,0 +1,366 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use arrow::array::RecordBatch;
+use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion::common::DFSchema;
+use datafusion::common::{Result, ScalarValue};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder,
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::expressions::{CastExpr, Column, Literal};
+use datafusion::physical_expr::schema_rewriter::{
+DefaultPhysicalExprAdapter, PhysicalExprAdapter,
+};
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{lit, SessionConfig};
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Metadata key for storing default values in field metadata
+const DEFAULT_VALUE_METADATA_KEY: &str = "example.default_value";
+
+// Example showing how to implement custom default value handling for missing
columns
+// using field metadata and PhysicalExprAdapter.
+//
+// This example demonstrates how to:
+// 1. Store default values in field metadata using a constant key
+// 2. Create a custom PhysicalExprAdapter that reads these defaults
+// 3. Inject default values for missing columns in filter predicates
+// 4. Use the DefaultPhysicalExprAdapter as a fallback for standard schema
adaptation
+// 5. Wrap string default values in cast expressions for proper type conversion
+//
+// Important: PhysicalExprAdapter is specifically designed for rewriting
filter predicates
+// that get pushed down to file scans. For handling missing columns in
projections,
+// other mechanisms in DataFusion are used (like SchemaAdapter).
+//
+// The metadata-based approach provides a flexible way to store default values
as strings
+// and cast them to the appropriate types at query time.
+
+#[tokio::main]
+async fn main() -> Result<()> {
+println!("=== Creating example data with missing columns and default
values ===");
+
+// Create sample data where the logical schema has more columns than the
physical schema
+let (logical_schema, physical_schema, batch) =
create_sample_data_with_defaults();
+
+let store = InMemory::new();
+let buf = {
+let mut buf = vec![];
+
+let props = WriterProperties::builder()
+.set_max_row_group_size(2)
+.build();
+
+let mut writer =
+ArrowWriter::try_new(&mut buf, physical_schema.clone(),
Some(props))
+.expect("creating writer");
+
+writer.write(&batch).expect("Writing batch");
+writer.close().unwrap();
+buf
+};
+let path = Path::from("example.parquet");
+let payload = PutPayload::from_bytes(buf.into());
+store.put(&path, payload).await?;
+
+// Create a custom table provider that handles missing columns with
defaults
+let table_provider =
Arc::new(DefaultValueTableProvider::new(logical_schema));
+
+// Set up query execution
+let mut cfg = SessionConfig::new();
+cfg.options_mut().execution.parquet.pushdown_filters = true;
+let ctx = SessionContext::new_with_config(cfg);
+
+// Register our table
+ctx.register_table("example_table", table_provider)?;
+
+ctx.runtime_env().
Re: [PR] Per file filter evaluation [datafusion]
adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2198638535
##
datafusion-examples/examples/default_column_values.rs:
##
@@ -0,0 +1,366 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use arrow::array::RecordBatch;
+use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion::common::DFSchema;
+use datafusion::common::{Result, ScalarValue};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder,
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::expressions::{CastExpr, Column, Literal};
+use datafusion::physical_expr::schema_rewriter::{
+DefaultPhysicalExprAdapter, PhysicalExprAdapter,
+};
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{lit, SessionConfig};
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Metadata key for storing default values in field metadata
+const DEFAULT_VALUE_METADATA_KEY: &str = "example.default_value";
+
+// Example showing how to implement custom default value handling for missing
columns
+// using field metadata and PhysicalExprAdapter.
+//
+// This example demonstrates how to:
+// 1. Store default values in field metadata using a constant key
+// 2. Create a custom PhysicalExprAdapter that reads these defaults
+// 3. Inject default values for missing columns in filter predicates
+// 4. Use the DefaultPhysicalExprAdapter as a fallback for standard schema
adaptation
+// 5. Wrap string default values in cast expressions for proper type conversion
+//
+// Important: PhysicalExprAdapter is specifically designed for rewriting
filter predicates
+// that get pushed down to file scans. For handling missing columns in
projections,
+// other mechanisms in DataFusion are used (like SchemaAdapter).
+//
+// The metadata-based approach provides a flexible way to store default values
as strings
+// and cast them to the appropriate types at query time.
+
+#[tokio::main]
+async fn main() -> Result<()> {
+println!("=== Creating example data with missing columns and default
values ===");
+
+// Create sample data where the logical schema has more columns than the
physical schema
+let (logical_schema, physical_schema, batch) =
create_sample_data_with_defaults();
+
+let store = InMemory::new();
+let buf = {
+let mut buf = vec![];
+
+let props = WriterProperties::builder()
+.set_max_row_group_size(2)
+.build();
+
+let mut writer =
+ArrowWriter::try_new(&mut buf, physical_schema.clone(),
Some(props))
+.expect("creating writer");
+
+writer.write(&batch).expect("Writing batch");
+writer.close().unwrap();
+buf
+};
+let path = Path::from("example.parquet");
+let payload = PutPayload::from_bytes(buf.into());
+store.put(&path, payload).await?;
+
+// Create a custom table provider that handles missing columns with
defaults
+let table_provider =
Arc::new(DefaultValueTableProvider::new(logical_schema));
+
+// Set up query execution
+let mut cfg = SessionConfig::new();
+cfg.options_mut().execution.parquet.pushdown_filters = true;
+let ctx = SessionContext::new_with_config(cfg);
+
+// Register our table
+ctx.register_table("example_table", table_provider)?;
+
+ctx.runtime_env().
Re: [PR] Per file filter evaluation [datafusion]
alamb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2192262877
##
datafusion-examples/examples/variant_shredding.rs:
##
@@ -0,0 +1,408 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::array::{RecordBatch, StringArray};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{
+Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
+};
+use datafusion::common::{assert_contains, DFSchema, Result};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder,
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{
+ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl,
Signature,
+TableProviderFilterPushDown, TableType, Volatility,
+};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::schema_rewriter::PhysicalExprSchemaRewriteHook;
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_expr::{expressions, ScalarFunctionExpr};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{lit, SessionConfig};
+use datafusion::scalar::ScalarValue;
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Example showing how to implement custom filter rewriting for variant
shredding.
Review Comment:
Since this example doesn't actually use variants, it uses JSON, what would
you think about calling this example `json_shredding` instead?
I can also help out with adding some more background here and diagrams -- I
think the important things to point out are:
1. This is a technique for speeding up queries on "semi-structured" data
such as JSON
2. It works by materializing ("shredding") one or more columns separately
from the JSOn to take advantage of columnar storage
3. However, this requires the query engine to rewrite predicates against the
semi-structured column into predicates on shredded columns when available
##
datafusion-examples/examples/variant_shredding.rs:
##
@@ -0,0 +1,408 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::array::{RecordBatch, StringArray};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{
+Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
+};
+use datafusion::common::{assert_contains, DFSchema, Result};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder,
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::log
Re: [PR] Per file filter evaluation [datafusion]
adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2190141364
##
datafusion/datasource-parquet/src/opener.rs:
##
@@ -233,21 +238,31 @@ impl FileOpener for ParquetOpener {
}
}
+predicate = predicate
+.map(|p| {
+if let Some(predicate_rewrite_hook) =
predicate_rewrite_hook.as_ref()
+{
+predicate_rewrite_hook
+.rewrite(Arc::clone(&p), &physical_file_schema)
+} else {
+Ok(p)
+}
+})
+.transpose()?;
Review Comment:
It's not clear to me if it's better if this runs before the
`PhysicalExprSchemaRewriter` and the simplifier or somehow during it. Running
it after doesn't really work because expressions may be simplified away or
filled in with nulls (e.g. because it's accessing a shredded variant field that
doesn't exist at the table schema level)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Per file filter evaluation [datafusion]
adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2190043206
##
datafusion-examples/examples/variant_shredding.rs:
##
@@ -0,0 +1,398 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::array::{RecordBatch, StringArray};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{Transformed, TreeNodeRecursion};
+use datafusion::common::{assert_contains, DFSchema, Result};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder,
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{
+ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl,
Signature,
+TableProviderFilterPushDown, TableType, Volatility,
+};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::schema_rewriter::PhysicalExprSchemaRewriteHook;
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_expr::{expressions, ScalarFunctionExpr};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{lit, SessionConfig};
+use datafusion::scalar::ScalarValue;
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Example showing how to implement custom filter rewriting for variant
shredding.
+//
+// In this example, we have a table with flat columns using underscore
prefixes:
+// data: "...", _data.name: "..."
Review Comment:
But I think that needs to be it's own initiative / PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Per file filter evaluation [datafusion]
adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2188925012
##
datafusion-examples/examples/variant_shredding.rs:
##
@@ -0,0 +1,398 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::array::{RecordBatch, StringArray};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{Transformed, TreeNodeRecursion};
+use datafusion::common::{assert_contains, DFSchema, Result};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder,
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{
+ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl,
Signature,
+TableProviderFilterPushDown, TableType, Volatility,
+};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::schema_rewriter::PhysicalExprSchemaRewriteHook;
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_expr::{expressions, ScalarFunctionExpr};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{lit, SessionConfig};
+use datafusion::scalar::ScalarValue;
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Example showing how to implement custom filter rewriting for variant
shredding.
+//
+// In this example, we have a table with flat columns using underscore
prefixes:
+// data: "...", _data.name: "..."
Review Comment:
Well as we've now found out in that issue the limitation is actually on
DataFusion's side. I think what we need for struct fields specifically is for
ParquetOpener to generate the right ProjectionMask based on the expressions
having a `get_field(col, 'a')` call. So we'd have:
1) SQL: `where name = 'Adrian' and get_field(details, 'coins') > 5`
2) Predicate in Parquet: BinaryExpr(BinaryExpr(Col(0), Eq, lit('Adrian'),
And, BinaryExpr(get_field(Col(1), 'coins'), Gt, lit(5)))`
3) Rewrite predicate to `BinaryExpr(BinaryExpr(Col(0), Eq, lit('Adrian'),
And, BinaryExpr(Col(1), Gt, lit(5)))` and make the projection mask
`ProjectionMask::leaves(vec![0, 2])` or something like that instead of
`ProjectionMask::roots`:
https://github.com/apache/datafusion/blob/12c40ca9499d23a3459c5f9f7712e9f62b3443a1/datafusion/datasource-parquet/src/row_filter.rs#L126-L129
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Per file filter evaluation [datafusion]
adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2188925012
##
datafusion-examples/examples/variant_shredding.rs:
##
@@ -0,0 +1,398 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::array::{RecordBatch, StringArray};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::memory::DataSourceExec;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{Transformed, TreeNodeRecursion};
+use datafusion::common::{assert_contains, DFSchema, Result};
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfigBuilder,
ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{
+ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl,
Signature,
+TableProviderFilterPushDown, TableType, Volatility,
+};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::schema_rewriter::PhysicalExprSchemaRewriteHook;
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_expr::{expressions, ScalarFunctionExpr};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{lit, SessionConfig};
+use datafusion::scalar::ScalarValue;
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Example showing how to implement custom filter rewriting for variant
shredding.
+//
+// In this example, we have a table with flat columns using underscore
prefixes:
+// data: "...", _data.name: "..."
Review Comment:
Well as we've now found out in that issue the limitation is actually on
DataFusion's side. I think what we need for struct fields specifically is for
ParquetOpener to generate the right ProjectionMask based on the expressions
having a `get_field(col, 'a')` call. So we'd have:
1) SQL: `where name = 'Adrian' and get_field(details, 'coins') > 5`
2) Predicate in Parquet: BinaryExpr(BinaryExpr(Col(0), Eq, lit('Adrian'),
And, BinaryExpr(get_field(Col(1), 'coins'), Gt, lit(5)))`
3) Rewrite predicate to BinaryExpr(BinaryExpr(Col(0), Eq, lit('Adrian'),
And, BinaryExpr(Col(1), Gt, lit(5)))` and make the projection mask
`ProjectionMask::leaves(vec![0, 2])` or something like that instead of
`ProjectionMask::roots`:
https://github.com/apache/datafusion/blob/12c40ca9499d23a3459c5f9f7712e9f62b3443a1/datafusion/datasource-parquet/src/row_filter.rs#L126-L129
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Per file filter evaluation [datafusion]
alamb commented on PR #15057: URL: https://github.com/apache/datafusion/pull/15057#issuecomment-3029071696 Sorry @adriangb -- I lost track of this PR -- I will put iy on my review queue for tomorrow -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Per file filter evaluation [datafusion]
adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2178912618
##
datafusion/datasource-parquet/src/opener.rs:
##
@@ -252,16 +257,20 @@ impl FileOpener for ParquetOpener {
// This evaluates missing columns and inserts any necessary casts.
let predicate = predicate
.map(|p| {
-PhysicalExprSchemaRewriter::new(
+let mut rewriter = PhysicalExprSchemaRewriter::new(
&physical_file_schema,
&logical_file_schema,
)
.with_partition_columns(
partition_fields.to_vec(),
file.partition_values,
-)
-.rewrite(p)
-.map_err(ArrowError::from)
+);
+if let Some(predicate_rewrite_hook) =
predicate_rewrite_hook.as_ref()
+{
+rewriter = rewriter
+
.with_rewrite_hook(Arc::clone(predicate_rewrite_hook));
+};
+rewriter.rewrite(p).map_err(ArrowError::from)
Review Comment:
I'm thinking maybe these should be decoupled and it just becomes two
different rewrite passes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Per file filter evaluation [datafusion]
adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r2093694833
##
datafusion/datasource-parquet/src/opener.rs:
##
@@ -55,8 +57,8 @@ pub(super) struct ParquetOpener {
pub limit: Option,
/// Optional predicate to apply during the scan
pub predicate: Option>,
-/// Schema of the output table
-pub table_schema: SchemaRef,
+/// Schema of the file as far as the rest of the system is concerned.
+pub logical_file_schema: SchemaRef,
Review Comment:
I think this rename is worth it - there's been constant confusion even
amongst maintainers about this. And this is only public for internal use.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Per file filter evaluation [datafusion]
jayzhan211 commented on PR #15057: URL: https://github.com/apache/datafusion/pull/15057#issuecomment-2801563360 > Is this the right API, or should some of the information on how to optimize / adapt to a specific file schema live in PhysicalExpr::with_file_schema or something like that? rewrite with file schema is specialized to filter, if you add `PhysicalExpr::with_file_schema` that means you may expect other kind of rewrite for other `PhysicalExpr`, I don't think we need this so far. > Can we use this mechanism both for variant_get(col, 'key') -> col.typed_value.key as well as cast(a, i64) = 100 -> cast(cast(a,i32),i64) = 100 -> a = 100 or is that mixing too many things into the same API? Makes sense to me if we have many rules inside the `filter_rewrite_with_file_schema` logic as long as the rewrite is leveraged on the provided file schema. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Per file filter evaluation [datafusion]
adriangb commented on PR #15057: URL: https://github.com/apache/datafusion/pull/15057#issuecomment-2801415154 > This is likely only applied to parquet filter so we can rewrite the filter when we know the filter + file_schema + table_schema (probably `build_row_filter`). We don't need optimize rule or trait method unless this rule could be applied more generally. Yes agreed, that's basically what's in this PR currently: a custom trait to implement an optimizer pass with all of that information available. My questions are: 1. Is this the right API, or should some of the information on how to optimize / adapt to a specific file schema live in `PhysicalExpr::with_file_schema` or something like that? 2. Can we use this mechanism both for `variant_get(col, 'key')` -> `col.typed_value.key` as well as `cast(a, i64) = 100` -> `cast(cast(a,i32),i64) = 100` -> `a = 100` or is that mixing too many things into the same API? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Per file filter evaluation [datafusion]
jayzhan211 commented on PR #15057: URL: https://github.com/apache/datafusion/pull/15057#issuecomment-2801368982 > And if that all makes sense... how do we do those optimizations? Is it something like an optimizer that has to downcast match on the expressions, or do we add methods to PhysicalExpr for each expression to describe how it handles this behavior? This is likely only applied to parquet filter so we can rewrite the filter when we know the filter + file_schema + table_schema (probably `build_row_filter`). We don't need optimize rule or trait method unless this rule could be applied more generally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Per file filter evaluation [datafusion]
adriangb commented on PR #15057: URL: https://github.com/apache/datafusion/pull/15057#issuecomment-2800379042 > Another question is, isn't the filter created based on table schema? And then the batch is read as file schema and casted to table schema and is evaluated by filter. Yes this is exactly the case. What we could do is rewrite the filter based on file schema. Assume we have `cast(a, i64) = 100`, `a` is i32 in table schema and i64 in file schema. We rewrite it to `cast(cast(a,i32),i64) = 100` and then optimize it with `a = 100`. Yes that is exactly what I am proposing above, what method it happens by is not that important to me. The other point is if we can use this same mechanism to handle shredding for the variant type. In other words, can we "optimize" `variant_get(col, 'key')` to `col.typed_value.key` if we know from the file schema that `key` is shredded for this specific file. And if that all makes sense... how do we do those optimizations? Is it something like an optimizer that has to downcast match on the expressions, or do we add methods to PhysicalExpr for each expression to describe how it handles this behavior? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Per file filter evaluation [datafusion]
jayzhan211 commented on PR #15057: URL: https://github.com/apache/datafusion/pull/15057#issuecomment-2800373423 > PhysicalExpr::with_schema This method is too general and it is unclear what we need to do with the provided schema for each PhysicalExpr, it is not a good idea. > I suspect the hard bit with this approach will be edge cases: what if a filter cannot adapt itself to the file schema, but we could cast the column to make it work? I'm thinking something like a UDF that only accepts Utf8 but the the file produces Utf8View I think it is unavoidable we need to cast the columns to be able to evaluate the filter. Another question is, isn't the filter created based on table schema? And then the batch is read as file schema and casted to table schema and is evaluated by filter. What we could do is rewrite the filter based on file schema. Assume we have `cast(a, i64) = 100`, `a` is i32 in table schema and i64 in file schema. We rewrite it to `cast(cast(a,i32),i64) = 100` and then optimize it with `a = 100`. In your example where udf only accepts utf8, we know that no optimization we could do so we just end up additional casting from file schema to table schema. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Per file filter evaluation [datafusion]
adriangb commented on PR #15057: URL: https://github.com/apache/datafusion/pull/15057#issuecomment-282196 I would like to resume this work. Some thoughts should the rewrite happen via a new trait as I'm currently doing, or should we add a method `PhysicalExpr::with_schema`? If we add `with_schema` what schema do we pass it? The actual file schema? There's something to be said for that: it could rewrite filters to case the literals / filters instead of casting the columns/arrays [as is currently done](https://github.com/pydantic/datafusion/blob/0b01fdf7f02f9097c319204058576f420b9790b4/datafusion/datasource-parquet/src/row_filter.rs#L146), which should be cheaper. I expect that any time it was okay to cast the data it was also okay to cast the predicate itself. It could also absorb the work of [reassign_predicate_columns](https://github.com/pydantic/datafusion/blob/0b01fdf7f02f9097c319204058576f420b9790b4/datafusion/datasource-parquet/src/row_filter.rs#L123) (we implement it for `Column` such that if it's index doesn't match but another one does it swaps). I suspect the hard bit with this approach will be edge cases: what if a filter _cannot_ adapt itself to the file schema, but we could cast the column to make it work? I'm thinking something like a UDF that only accepts `Utf8` but the the file produces `Utf8View` 🤔 I think @jayzhan-synnada proposed something similar in https://github.com/apache/datafusion/pull/15685/files#diff-2b3f5563d9441d3303b57e58e804ab07a10d198973eed20e7751b5a20b955e42. @alamb any thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Per file filter evaluation [datafusion]
adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r1988141252
##
datafusion/datasource-parquet/src/source.rs:
##
@@ -559,24 +556,8 @@ impl FileSource for ParquetSource {
.predicate()
.map(|p| format!(", predicate={p}"))
.unwrap_or_default();
-let pruning_predicate_string = self
-.pruning_predicate()
-.map(|pre| {
-let mut guarantees = pre
-.literal_guarantees()
-.iter()
-.map(|item| format!("{}", item))
-.collect_vec();
-guarantees.sort();
-format!(
-", pruning_predicate={}, required_guarantees=[{}]",
-pre.predicate_expr(),
-guarantees.join(", ")
-)
-})
-.unwrap_or_default();
-write!(f, "{}{}", predicate_string, pruning_predicate_string)
Review Comment:
This is what's causing tests to fail. Tests assert against the formatting of
a ParquetSource and accessing it's pruning predicate method. I'm not sure if we
should rewrite the tests and somehow make the generated predicates accessible,
bin them, etc.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Per file filter evaluation [datafusion]
alamb commented on PR #15057: URL: https://github.com/apache/datafusion/pull/15057#issuecomment-2726544602 I will try and give this a look over the next few days -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Per file filter evaluation [datafusion]
adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r1994289815
##
datafusion/datasource-parquet/src/source.rs:
##
@@ -394,6 +389,26 @@ impl ParquetSource {
self
}
+/// Return the optional filter expression rewriter factory
+pub fn filter_expression_rewriter_factory(
+&self,
+) -> Option<&Arc> {
+self.filter_expression_rewriter.as_ref()
+}
+
+/// Set optional filter expression rewriter.
+///
+/// [`FileExpressionRewriter`] allows users to specify how filter
Review Comment:
```suggestion
/// [`FileExpressionRewriter`] allows specifying how filter
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Per file filter evaluation [datafusion]
adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r1994289444
##
datafusion/datasource/src/file_expr_rewriter.rs:
##
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_physical_plan::PhysicalExpr;
+
+/// Rewrite an expressions to take into account this file's particular schema.
+/// This can be used to evaluate expressions against shredded variant columns
or columns that pre-compute expressions (e.g. `day(timestamp)`).
+pub trait FileExpressionRewriter: Debug + Send + Sync {
+/// Rewrite an an expression in the context of a file schema.
+fn rewrite(
+&self,
+file_schema: SchemaRef,
+expr: Arc,
+) -> Result>;
+}
Review Comment:
Note: if users need the `table_schema` they can bind that inside of
`TableProvider::scan`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Per file filter evaluation [datafusion]
adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r1992124608
##
datafusion/datasource-parquet/src/opener.rs:
##
@@ -83,10 +87,23 @@ pub(super) struct ParquetOpener {
pub enable_bloom_filter: bool,
/// Schema adapter factory
pub schema_adapter_factory: Arc,
+/// Filter expression rewriter factory
+pub filter_expression_rewriter: Option>,
}
impl FileOpener for ParquetOpener {
fn open(&self, file_meta: FileMeta) -> Result {
+// Note about schemas: we are actually dealing with _4_ different
schemas here:
+// - The table schema as defined by the TableProvider. This is what
the user sees, what they get when they `SELECT * FROM table`, etc.
+// - The "virtual" file schema: this is the table schema minus any
hive partition columns. This is what the file schema is coerced to.
+// - The physical file schema: this is the schema as defined by the
parquet file. This is what the parquet file actually contains.
+// - The filter schema: a hybrid of the virtual file schema and the
physical file schema.
+// If a filter is rewritten to reference columns that are in the
physical file schema but not the virtual file schema, we need to add those
columns to the filter schema so that the filter can be evaluated.
+// This schema is generated by taking any columns from the virtual
file schema that are referenced by the filter and adding any columns from the
physical file schema that are referenced by the filter but not in the virtual
file schema.
+// Columns from the virtual file schema are added in the order they
appear in the virtual file schema.
+// The columns from the physical file schema are always added to the
end of the schema, in the order they appear in the physical file schema.
+//
+// I think it might be wise to do some renaming of parameters where
possible, e.g. rename `file_schema` to `table_schema_without_partition_columns`
and `physical_file_schema` or something like that.
Review Comment:
This is an interesting bit to ponder upon
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Per file filter evaluation [datafusion]
adriangb commented on PR #15057: URL: https://github.com/apache/datafusion/pull/15057#issuecomment-2718504764 Okay folks sorry for the churn, I thought this was in a better state than it ended up being. I've now reworked it to minimize the diff and make sure all existing tests pass. I'm going to add tests for the new functionality now to compliment the example. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Per file filter evaluation [datafusion]
adriangb commented on PR #15057: URL: https://github.com/apache/datafusion/pull/15057#issuecomment-2715766181 Okay I think I can answer my own question: https://github.com/pydantic/datafusion/blob/38356998059a2d08113401ea8111f238899ab0b8/datafusion/core/src/datasource/listing/table.rs#L961-L995 Based on this it seems like it's safe to mark filters as exact if they are getting pushed down 😄 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Per file filter evaluation [datafusion]
adriangb commented on PR #15057: URL: https://github.com/apache/datafusion/pull/15057#issuecomment-2715592559 The main issue I've found with this approach is marking filters as `Exact` or `Inexact`. In particular unless you mark them as `Exact` DataFusion will still need to pull the possibly large unshredded data to re-apply filters in a `FilterExec`. This doesn't completely kill performance because if the filter is selective there is less data to re-filter, but the worst case scenario is possibly worse than not having this feature at all. But I feel like this is a consequence of filter pushdown in general? Ignoring this change, if I have a `TableProvider` that returns a `DataSourceExec` and I have filter pushdown enabled, should I be marking _all_ of my filters as `Exact`? That seems dangerous given that it's not documented anywhere that filter pushdown supports all filters that `FilterExec` does and things like https://github.com/apache/datafusion/blob/9382add72b929c553ca4976d1423d8ebbc80889d/datafusion/datasource-parquet/src/row_filter.rs#L333-L336. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Per file filter evaluation [datafusion]
adriangb commented on PR #15057: URL: https://github.com/apache/datafusion/pull/15057#issuecomment-2712077738 @alamb I think this is ready for a first round of review when you have a chance! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Per file filter evaluation [datafusion]
adriangb commented on code in PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#discussion_r1988140038
##
datafusion/datasource-parquet/src/opener.rs:
##
@@ -111,18 +109,18 @@ impl FileOpener for ParquetOpener {
.schema_adapter_factory
.create(projected_schema, Arc::clone(&self.table_schema));
let predicate = self.predicate.clone();
-let pruning_predicate = self.pruning_predicate.clone();
-let page_pruning_predicate = self.page_pruning_predicate.clone();
let table_schema = Arc::clone(&self.table_schema);
+let filter_expression_rewriter_factory =
+self.filter_expression_rewriter_factory.clone();
let reorder_predicates = self.reorder_filters;
let pushdown_filters = self.pushdown_filters;
-let enable_page_index = should_enable_page_index(
-self.enable_page_index,
-&self.page_pruning_predicate,
-);
+let enable_page_index = self.enable_page_index &&
self.predicate.is_some();
Review Comment:
We need to decide upfront if we need the page index or not.
Previously the pruning predicate got calculated from the "file schema" that
users pass in (which is obviously _not_ the schema of the physical file we are
reading, it is expected to be the table schema - partition columns; I think
that should be renamed / reworked but that's for another day). This seems wrong
to me: I think it should be using the physical file schema since that's what
the rg/page stats it will be loading match. So *I think* moving it here
actually improves some edge cases. _But_ we now have to start loading the file
before we know if we have any page index pruning predicate, so we may load the
page index then realize we don't have any applicable filters. That should only
happen if we have a predicate and can't build a page pruning predicate from the
transformed predicate, I don't think that will be all that common.
##
datafusion-examples/examples/struct_field_rewrite.rs:
##
@@ -0,0 +1,353 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray,
StructArray};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow_schema::Fields;
+use async_trait::async_trait;
+
+use datafusion::assert_batches_eq;
+use datafusion::catalog::{Session, TableProvider};
+use datafusion::common::tree_node::{
+Transformed, TransformedResult, TreeNode, TreeNodeRewriter,
+};
+use datafusion::common::{DFSchema, Result};
+use datafusion::datasource::file_expr_rewriter::FileExpressionRewriter;
+use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
+use datafusion::execution::context::SessionContext;
+use datafusion::execution::object_store::ObjectStoreUrl;
+use datafusion::logical_expr::utils::conjunction;
+use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
+use datafusion::parquet::arrow::ArrowWriter;
+use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_expr::{expressions, ScalarFunctionExpr};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::lit;
+use futures::StreamExt;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+
+// Example showing how to implement custom filter rewriting for struct fields.
+//
+// In this example, we have a table with a struct column like:
+// struct_col: {"a": 1, "b": "foo"}
+//
+// Our custom TableProvider will use a FilterExpressionRewriter to rewrite
+// expressions like `struct_col['a'] = 10` to use a flattened column name
+// `_struct_col.a` if it exists in the file schema.
+#[tokio::main]
+async fn main() -> Result<()> {
+println!("=== Creating example data with structs and flattened fields
===");
+println!("NOTE: This example demonstrates filter expression rewriting for
struct field access.");
+println!(" We deliberately create different values in the struct
Re: [PR] Per file filter evaluation [datafusion]
adriangb commented on PR #15057: URL: https://github.com/apache/datafusion/pull/15057#issuecomment-2712042344 The example is now working and even does stats pruning of shredded columns 🚀 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Per file filter evaluation [datafusion]
adriangb commented on PR #15057: URL: https://github.com/apache/datafusion/pull/15057#issuecomment-2707505442 Ok the example is now working and I think the overall approach seems good but probably needs a lot of tweaking. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Per file filter evaluation [datafusion]
adriangb commented on PR #15057:
URL: https://github.com/apache/datafusion/pull/15057#issuecomment-2705670873
The example is not working yet. It gets `Error:
Shared(ArrowError(ExternalError(External(ComputeError("Error evaluating filter
predicate: Internal(\"PhysicalExpr Column references column '_user_info.age' at
index 0 (zero-based) but input schema only has 0 columns: []\")"))), None))`. I
think some work will be needed with how the rewrites interact with projections
for the filters.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
