This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new e01fa0ce42 Cut `Parquet` over to PhysicalExprAdapter, remove
`SchemaAdapter` (#18998)
e01fa0ce42 is described below
commit e01fa0ce42add38d0e1894fb970dc9f8c3742a43
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Fri Dec 5 08:36:00 2025 +0100
Cut `Parquet` over to PhysicalExprAdapter, remove `SchemaAdapter` (#18998)
Chips away at https://github.com/apache/datafusion/issues/14993 and
https://github.com/apache/datafusion/issues/16800
Changes made in this PR:
- Update DefaultPhysicalExprAdapter to handle struct column evolution
- Remove use of SchemaAdapter from row filter / predicate pushdown /
late materialization. I believe it was already not doing much here other
than where tests checked specific behavior and had not been updated to
use PhysicalExprAdapter
- Changed projection handling to use `PhysicalExprAdapter` instead of
`SchemaAdapter`
- Kept intermediary `Vec<usize>` so we can use `ProjectionMask::roots`
and punt the complexity of implementing `ProjectionExprs` ->
`ProjectionMask` until a later PR (there is a draft in #18966 of what
that might look like).
---
datafusion/core/src/datasource/mod.rs | 122 +++--
.../core/src/datasource/physical_plan/parquet.rs | 4 +-
datafusion/core/tests/parquet/schema_adapter.rs | 390 ++++-----------
.../schema_adapter_integration_tests.rs | 540 +++++++++++++++++----
datafusion/datasource-parquet/src/file_format.rs | 5 +-
datafusion/datasource-parquet/src/opener.rs | 243 ++--------
datafusion/datasource-parquet/src/row_filter.rs | 127 ++---
datafusion/datasource-parquet/src/source.rs | 91 +---
.../physical-expr-adapter/src/schema_rewriter.rs | 211 ++++++--
datafusion/physical-expr/src/expressions/cast.rs | 3 +
datafusion/physical-expr/src/projection.rs | 43 ++
datafusion/pruning/src/pruning_predicate.rs | 15 +
datafusion/sqllogictest/test_files/timestamps.slt | 70 +++
docs/source/library-user-guide/upgrading.md | 11 +-
14 files changed, 1020 insertions(+), 855 deletions(-)
diff --git a/datafusion/core/src/datasource/mod.rs
b/datafusion/core/src/datasource/mod.rs
index 620e389a0f..28faea9a68 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -55,30 +55,35 @@ mod tests {
use crate::prelude::SessionContext;
use ::object_store::{path::Path, ObjectMeta};
use arrow::{
- array::{Int32Array, StringArray},
- datatypes::{DataType, Field, Schema, SchemaRef},
+ array::Int32Array,
+ datatypes::{DataType, Field, FieldRef, Schema, SchemaRef},
record_batch::RecordBatch,
};
- use datafusion_common::{record_batch, test_util::batches_to_sort_string};
+ use datafusion_common::{
+ record_batch,
+ test_util::batches_to_sort_string,
+ tree_node::{Transformed, TransformedResult, TreeNode},
+ Result, ScalarValue,
+ };
use datafusion_datasource::{
- file::FileSource,
file_scan_config::FileScanConfigBuilder,
- schema_adapter::{
- DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory,
- SchemaMapper,
- },
- source::DataSourceExec,
+ schema_adapter::DefaultSchemaAdapterFactory, source::DataSourceExec,
PartitionedFile,
};
use datafusion_datasource_parquet::source::ParquetSource;
+ use datafusion_physical_expr::expressions::{Column, Literal};
+ use datafusion_physical_expr_adapter::{
+ PhysicalExprAdapter, PhysicalExprAdapterFactory,
+ };
+ use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_plan::collect;
use std::{fs, sync::Arc};
use tempfile::TempDir;
#[tokio::test]
- async fn can_override_schema_adapter() {
- // Test shows that SchemaAdapter can add a column that doesn't
existing in the
- // record batches returned from parquet. This can be useful for
schema evolution
+ async fn can_override_physical_expr_adapter() {
+ // Test shows that PhysicalExprAdapter can add a column that doesn't
exist in the
+ // record batches returned from parquet. This can be useful for schema
evolution
// where older files may not have all columns.
use datafusion_execution::object_store::ObjectStoreUrl;
@@ -124,12 +129,11 @@ mod tests {
let f2 = Field::new("extra_column", DataType::Utf8, true);
let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));
- let source = ParquetSource::new(Arc::clone(&schema))
- .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {}))
- .unwrap();
+ let source = Arc::new(ParquetSource::new(Arc::clone(&schema)));
let base_conf =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(),
source)
.with_file(partitioned_file)
+
.with_expr_adapter(Some(Arc::new(TestPhysicalExprAdapterFactory)))
.build();
let parquet_exec = DataSourceExec::from_data_source(base_conf);
@@ -200,72 +204,54 @@ mod tests {
}
#[derive(Debug)]
- struct TestSchemaAdapterFactory;
+ struct TestPhysicalExprAdapterFactory;
- impl SchemaAdapterFactory for TestSchemaAdapterFactory {
+ impl PhysicalExprAdapterFactory for TestPhysicalExprAdapterFactory {
fn create(
&self,
- projected_table_schema: SchemaRef,
- _table_schema: SchemaRef,
- ) -> Box<dyn SchemaAdapter> {
- Box::new(TestSchemaAdapter {
- table_schema: projected_table_schema,
+ logical_file_schema: SchemaRef,
+ physical_file_schema: SchemaRef,
+ ) -> Arc<dyn PhysicalExprAdapter> {
+ Arc::new(TestPhysicalExprAdapter {
+ logical_file_schema,
+ physical_file_schema,
})
}
}
- struct TestSchemaAdapter {
- /// Schema for the table
- table_schema: SchemaRef,
+ #[derive(Debug)]
+ struct TestPhysicalExprAdapter {
+ logical_file_schema: SchemaRef,
+ physical_file_schema: SchemaRef,
}
- impl SchemaAdapter for TestSchemaAdapter {
- fn map_column_index(&self, index: usize, file_schema: &Schema) ->
Option<usize> {
- let field = self.table_schema.field(index);
- Some(file_schema.fields.find(field.name())?.0)
- }
-
- fn map_schema(
- &self,
- file_schema: &Schema,
- ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
- let mut projection =
Vec::with_capacity(file_schema.fields().len());
-
- for (file_idx, file_field) in
file_schema.fields.iter().enumerate() {
- if
self.table_schema.fields().find(file_field.name()).is_some() {
- projection.push(file_idx);
+ impl PhysicalExprAdapter for TestPhysicalExprAdapter {
+ fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn
PhysicalExpr>> {
+ expr.transform(|e| {
+ if let Some(column) = e.as_any().downcast_ref::<Column>() {
+ // If column is "extra_column" and missing from physical
schema, inject "foo"
+ if column.name() == "extra_column"
+ &&
self.physical_file_schema.index_of("extra_column").is_err()
+ {
+ return Ok(Transformed::yes(Arc::new(Literal::new(
+ ScalarValue::Utf8(Some("foo".to_string())),
+ ))
+ as Arc<dyn PhysicalExpr>));
+ }
}
- }
-
- Ok((Arc::new(TestSchemaMapping {}), projection))
- }
- }
-
- #[derive(Debug)]
- struct TestSchemaMapping {}
-
- impl SchemaMapper for TestSchemaMapping {
- fn map_batch(
- &self,
- batch: RecordBatch,
- ) -> datafusion_common::Result<RecordBatch> {
- let f1 = Field::new("id", DataType::Int32, true);
- let f2 = Field::new("extra_column", DataType::Utf8, true);
-
- let schema = Arc::new(Schema::new(vec![f1, f2]));
-
- let extra_column = Arc::new(StringArray::from(vec!["foo"]));
- let mut new_columns = batch.columns().to_vec();
- new_columns.push(extra_column);
-
- Ok(RecordBatch::try_new(schema, new_columns).unwrap())
+ Ok(Transformed::no(e))
+ })
+ .data()
}
- fn map_column_statistics(
+ fn with_partition_values(
&self,
- _file_col_statistics: &[datafusion_common::ColumnStatistics],
- ) ->
datafusion_common::Result<Vec<datafusion_common::ColumnStatistics>> {
- unimplemented!()
+ _partition_values: Vec<(FieldRef, ScalarValue)>,
+ ) -> Arc<dyn PhysicalExprAdapter> {
+ Arc::new(TestPhysicalExprAdapter {
+ logical_file_schema: self.logical_file_schema.clone(),
+ physical_file_schema: self.physical_file_schema.clone(),
+ })
}
}
}
diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs
b/datafusion/core/src/datasource/physical_plan/parquet.rs
index 4d7c4a8788..6ed01cde14 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -1257,7 +1257,7 @@ mod tests {
("c3", c3.clone()),
]);
- // batch2: c3(int8), c2(int64), c1(string), c4(string)
+ // batch2: c3(date64), c2(int64), c1(string)
let batch2 = create_batch(vec![("c3", c4), ("c2", c2), ("c1", c1)]);
let table_schema = Schema::new(vec![
@@ -1272,7 +1272,7 @@ mod tests {
.round_trip_to_batches(vec![batch1, batch2])
.await;
assert_contains!(read.unwrap_err().to_string(),
- "Cannot cast file schema field c3 of type Date64 to table schema
field of type Int8");
+ "Cannot cast column 'c3' from 'Date64' (physical data type) to
'Int8' (logical data type)");
}
#[tokio::test]
diff --git a/datafusion/core/tests/parquet/schema_adapter.rs
b/datafusion/core/tests/parquet/schema_adapter.rs
index 0e76d626aa..541785319c 100644
--- a/datafusion/core/tests/parquet/schema_adapter.rs
+++ b/datafusion/core/tests/parquet/schema_adapter.rs
@@ -17,8 +17,7 @@
use std::sync::Arc;
-use arrow::array::{record_batch, RecordBatch, RecordBatchOptions};
-use arrow::compute::{cast_with_options, CastOptions};
+use arrow::array::{record_batch, RecordBatch};
use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef};
use bytes::{BufMut, BytesMut};
use datafusion::assert_batches_eq;
@@ -29,14 +28,8 @@ use datafusion::datasource::listing::{
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::DataFusionError;
-use datafusion_common::{ColumnStatistics, ScalarValue};
-use datafusion_datasource::file::FileSource;
-use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
-use datafusion_datasource::schema_adapter::{
- DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory,
SchemaMapper,
-};
+use datafusion_common::ScalarValue;
use datafusion_datasource::ListingTableUrl;
-use datafusion_datasource_parquet::source::ParquetSource;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_physical_expr::expressions::{self, Column};
use datafusion_physical_expr::PhysicalExpr;
@@ -44,7 +37,6 @@ use datafusion_physical_expr_adapter::{
DefaultPhysicalExprAdapter, DefaultPhysicalExprAdapterFactory,
PhysicalExprAdapter,
PhysicalExprAdapterFactory,
};
-use itertools::Itertools;
use object_store::{memory::InMemory, path::Path, ObjectStore};
use parquet::arrow::ArrowWriter;
@@ -59,98 +51,10 @@ async fn write_parquet(batch: RecordBatch, store: Arc<dyn
ObjectStore>, path: &s
store.put(&Path::from(path), data.into()).await.unwrap();
}
-#[derive(Debug)]
-struct CustomSchemaAdapterFactory;
-
-impl SchemaAdapterFactory for CustomSchemaAdapterFactory {
- fn create(
- &self,
- projected_table_schema: SchemaRef,
- _table_schema: SchemaRef,
- ) -> Box<dyn SchemaAdapter> {
- Box::new(CustomSchemaAdapter {
- logical_file_schema: projected_table_schema,
- })
- }
-}
-
-#[derive(Debug)]
-struct CustomSchemaAdapter {
- logical_file_schema: SchemaRef,
-}
-
-impl SchemaAdapter for CustomSchemaAdapter {
- fn map_column_index(&self, index: usize, file_schema: &Schema) ->
Option<usize> {
- for (idx, field) in file_schema.fields().iter().enumerate() {
- if field.name() == self.logical_file_schema.field(index).name() {
- return Some(idx);
- }
- }
- None
- }
-
- fn map_schema(
- &self,
- file_schema: &Schema,
- ) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
- let projection = (0..file_schema.fields().len()).collect_vec();
- Ok((
- Arc::new(CustomSchemaMapper {
- logical_file_schema: Arc::clone(&self.logical_file_schema),
- }),
- projection,
- ))
- }
-}
-
-#[derive(Debug)]
-struct CustomSchemaMapper {
- logical_file_schema: SchemaRef,
-}
-
-impl SchemaMapper for CustomSchemaMapper {
- fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
- let mut output_columns =
- Vec::with_capacity(self.logical_file_schema.fields().len());
- for field in self.logical_file_schema.fields() {
- if let Some(array) = batch.column_by_name(field.name()) {
- output_columns.push(cast_with_options(
- array,
- field.data_type(),
- &CastOptions::default(),
- )?);
- } else {
- // Create a new array with the default value for the field type
- let default_value = match field.data_type() {
- DataType::Int64 => ScalarValue::Int64(Some(0)),
- DataType::Utf8 => ScalarValue::Utf8(Some("a".to_string())),
- _ => unimplemented!("Unsupported data type: {}",
field.data_type()),
- };
- output_columns
-
.push(default_value.to_array_of_size(batch.num_rows()).unwrap());
- }
- }
- let batch = RecordBatch::try_new_with_options(
- Arc::clone(&self.logical_file_schema),
- output_columns,
- &RecordBatchOptions::new().with_row_count(Some(batch.num_rows())),
- )
- .unwrap();
- Ok(batch)
- }
-
- fn map_column_statistics(
- &self,
- _file_col_statistics: &[ColumnStatistics],
- ) -> Result<Vec<ColumnStatistics>> {
- Ok(vec![
- ColumnStatistics::new_unknown();
- self.logical_file_schema.fields().len()
- ])
- }
-}
-
-// Implement a custom PhysicalExprAdapterFactory that fills in missing columns
with the default value for the field type
+// Implement a custom PhysicalExprAdapterFactory that fills in missing columns
with
+// the default value for the field type:
+// - Int64 columns are filled with `1`
+// - Utf8 columns are filled with `'b'`
#[derive(Debug)]
struct CustomPhysicalExprAdapterFactory;
@@ -264,13 +168,13 @@ async fn
test_custom_schema_adapter_and_custom_expression_adapter() {
);
assert!(!ctx.state().config().collect_statistics());
+ // Test with DefaultPhysicalExprAdapterFactory - missing columns are
filled with NULL
let listing_table_config =
ListingTableConfig::new(ListingTableUrl::parse("memory:///").unwrap())
.infer_options(&ctx.state())
.await
.unwrap()
.with_schema(table_schema.clone())
- .with_schema_adapter_factory(Arc::new(DefaultSchemaAdapterFactory))
.with_expr_adapter_factory(Arc::new(DefaultPhysicalExprAdapterFactory));
let table = ListingTable::try_new(listing_table_config).unwrap();
@@ -293,36 +197,72 @@ async fn
test_custom_schema_adapter_and_custom_expression_adapter() {
];
assert_batches_eq!(expected, &batches);
- // Test using a custom schema adapter and no explicit physical expr adapter
- // This should use the custom schema adapter both for projections and
predicate pushdown
+ // Test with a custom physical expr adapter
+ // PhysicalExprAdapterFactory now handles both predicates AND projections
+ // CustomPhysicalExprAdapterFactory fills missing columns with 'b' for Utf8
let listing_table_config =
ListingTableConfig::new(ListingTableUrl::parse("memory:///").unwrap())
.infer_options(&ctx.state())
.await
.unwrap()
.with_schema(table_schema.clone())
- .with_schema_adapter_factory(Arc::new(CustomSchemaAdapterFactory));
+
.with_expr_adapter_factory(Arc::new(CustomPhysicalExprAdapterFactory));
let table = ListingTable::try_new(listing_table_config).unwrap();
ctx.deregister_table("t").unwrap();
ctx.register_table("t", Arc::new(table)).unwrap();
let batches = ctx
- .sql("SELECT c2, c1 FROM t WHERE c1 = 2 AND c2 = 'a'")
+ .sql("SELECT c2, c1 FROM t WHERE c1 = 2 AND c2 = 'b'")
.await
.unwrap()
.collect()
.await
.unwrap();
+ // With CustomPhysicalExprAdapterFactory, missing column c2 is filled with
'b'
+ // in both the predicate (c2 = 'b' becomes 'b' = 'b' -> true) and the
projection
let expected = [
"+----+----+",
"| c2 | c1 |",
"+----+----+",
- "| a | 2 |",
+ "| b | 2 |",
"+----+----+",
];
assert_batches_eq!(expected, &batches);
+}
+
+/// Test demonstrating how to implement a custom PhysicalExprAdapterFactory
+/// that fills missing columns with non-null default values.
+///
+/// This is the recommended migration path for users who previously used
+/// SchemaAdapterFactory to fill missing columns with default values.
+/// Instead of transforming batches after reading (SchemaAdapter::map_batch),
+/// the PhysicalExprAdapterFactory rewrites expressions to use literals for
+/// missing columns, achieving the same result more efficiently.
+#[tokio::test]
+async fn test_physical_expr_adapter_with_non_null_defaults() {
+ // File only has c1 column
+ let batch = record_batch!(("c1", Int32, [10, 20, 30])).unwrap();
+
+ let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+ let store_url = ObjectStoreUrl::parse("memory://").unwrap();
+ write_parquet(batch, store.clone(), "defaults_test.parquet").await;
+
+ // Table schema has additional columns c2 (Utf8) and c3 (Int64) that don't
exist in file
+ let table_schema = Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::Int64, false), // type differs from file
(Int32 vs Int64)
+ Field::new("c2", DataType::Utf8, true), // missing from file
+ Field::new("c3", DataType::Int64, true), // missing from file
+ ]));
+
+ let mut cfg = SessionConfig::new()
+ .with_collect_statistics(false)
+ .with_parquet_pruning(false);
+ cfg.options_mut().execution.parquet.pushdown_filters = true;
+ let ctx = SessionContext::new_with_config(cfg);
+ ctx.register_object_store(store_url.as_ref(), Arc::clone(&store));
- // Do the same test but with a custom physical expr adapter
- // Now the default schema adapter will be used for projections, but the
custom physical expr adapter will be used for predicate pushdown
+ // CustomPhysicalExprAdapterFactory fills:
+ // - missing Utf8 columns with 'b'
+ // - missing Int64 columns with 1
let listing_table_config =
ListingTableConfig::new(ListingTableUrl::parse("memory:///").unwrap())
.infer_options(&ctx.state())
@@ -330,218 +270,66 @@ async fn
test_custom_schema_adapter_and_custom_expression_adapter() {
.unwrap()
.with_schema(table_schema.clone())
.with_expr_adapter_factory(Arc::new(CustomPhysicalExprAdapterFactory));
+
let table = ListingTable::try_new(listing_table_config).unwrap();
- ctx.deregister_table("t").unwrap();
ctx.register_table("t", Arc::new(table)).unwrap();
+
+ // Query all columns - missing columns should have default values
let batches = ctx
- .sql("SELECT c2, c1 FROM t WHERE c1 = 2 AND c2 = 'b'")
+ .sql("SELECT c1, c2, c3 FROM t ORDER BY c1")
.await
.unwrap()
.collect()
.await
.unwrap();
+
+ // c1 is cast from Int32 to Int64, c2 defaults to 'b', c3 defaults to 1
let expected = [
- "+----+----+",
- "| c2 | c1 |",
- "+----+----+",
- "| | 2 |",
- "+----+----+",
+ "+----+----+----+",
+ "| c1 | c2 | c3 |",
+ "+----+----+----+",
+ "| 10 | b | 1 |",
+ "| 20 | b | 1 |",
+ "| 30 | b | 1 |",
+ "+----+----+----+",
];
assert_batches_eq!(expected, &batches);
- // If we use both then the custom physical expr adapter will be used for
predicate pushdown and the custom schema adapter will be used for projections
- let listing_table_config =
- ListingTableConfig::new(ListingTableUrl::parse("memory:///").unwrap())
- .infer_options(&ctx.state())
- .await
- .unwrap()
- .with_schema(table_schema.clone())
- .with_schema_adapter_factory(Arc::new(CustomSchemaAdapterFactory))
-
.with_expr_adapter_factory(Arc::new(CustomPhysicalExprAdapterFactory));
- let table = ListingTable::try_new(listing_table_config).unwrap();
- ctx.deregister_table("t").unwrap();
- ctx.register_table("t", Arc::new(table)).unwrap();
+ // Verify predicates work with default values
+ // c3 = 1 should match all rows since default is 1
let batches = ctx
- .sql("SELECT c2, c1 FROM t WHERE c1 = 2 AND c2 = 'b'")
+ .sql("SELECT c1 FROM t WHERE c3 = 1 ORDER BY c1")
.await
.unwrap()
.collect()
.await
.unwrap();
+
+ #[rustfmt::skip]
let expected = [
- "+----+----+",
- "| c2 | c1 |",
- "+----+----+",
- "| a | 2 |",
- "+----+----+",
+ "+----+",
+ "| c1 |",
+ "+----+",
+ "| 10 |",
+ "| 20 |",
+ "| 30 |",
+ "+----+",
];
assert_batches_eq!(expected, &batches);
-}
-
-/// A test schema adapter factory that adds prefix to column names
-#[derive(Debug)]
-struct PrefixAdapterFactory {
- prefix: String,
-}
-
-impl SchemaAdapterFactory for PrefixAdapterFactory {
- fn create(
- &self,
- projected_table_schema: SchemaRef,
- _table_schema: SchemaRef,
- ) -> Box<dyn SchemaAdapter> {
- Box::new(PrefixAdapter {
- input_schema: projected_table_schema,
- prefix: self.prefix.clone(),
- })
- }
-}
-
-/// A test schema adapter that adds prefix to column names
-#[derive(Debug)]
-struct PrefixAdapter {
- input_schema: SchemaRef,
- prefix: String,
-}
-
-impl SchemaAdapter for PrefixAdapter {
- fn map_column_index(&self, index: usize, file_schema: &Schema) ->
Option<usize> {
- let field = self.input_schema.field(index);
- file_schema.fields.find(field.name()).map(|(i, _)| i)
- }
-
- fn map_schema(
- &self,
- file_schema: &Schema,
- ) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
- let mut projection = Vec::with_capacity(file_schema.fields().len());
- for (file_idx, file_field) in file_schema.fields().iter().enumerate() {
- if self.input_schema.fields().find(file_field.name()).is_some() {
- projection.push(file_idx);
- }
- }
-
- // Create a schema mapper that adds a prefix to column names
- #[derive(Debug)]
- struct PrefixSchemaMapping {
- // Keep only the prefix field which is actually used in the
implementation
- prefix: String,
- }
-
- impl SchemaMapper for PrefixSchemaMapping {
- fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
- // Create a new schema with prefixed field names
- let prefixed_fields: Vec<Field> = batch
- .schema()
- .fields()
- .iter()
- .map(|field| {
- Field::new(
- format!("{}{}", self.prefix, field.name()),
- field.data_type().clone(),
- field.is_nullable(),
- )
- })
- .collect();
- let prefixed_schema = Arc::new(Schema::new(prefixed_fields));
-
- // Create a new batch with the prefixed schema but the same
data
- let options = RecordBatchOptions::default();
- RecordBatch::try_new_with_options(
- prefixed_schema,
- batch.columns().to_vec(),
- &options,
- )
- .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))
- }
-
- fn map_column_statistics(
- &self,
- stats: &[ColumnStatistics],
- ) -> Result<Vec<ColumnStatistics>> {
- // For testing, just return the input statistics
- Ok(stats.to_vec())
- }
- }
- Ok((
- Arc::new(PrefixSchemaMapping {
- prefix: self.prefix.clone(),
- }),
- projection,
- ))
- }
-}
-
-#[test]
-fn test_apply_schema_adapter_with_factory() {
- // Create a schema
- let schema = Arc::new(Schema::new(vec![
- Field::new("id", DataType::Int32, false),
- Field::new("name", DataType::Utf8, true),
- ]));
-
- // Create a parquet source
- let source = ParquetSource::new(schema.clone());
-
- // Create a file scan config with source that has a schema adapter factory
- let factory = Arc::new(PrefixAdapterFactory {
- prefix: "test_".to_string(),
- });
-
- let file_source =
source.clone().with_schema_adapter_factory(factory).unwrap();
-
- let config =
- FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(),
file_source)
- .build();
-
- // Apply schema adapter to a new source
- let result_source = source.apply_schema_adapter(&config).unwrap();
-
- // Verify the adapter was applied
- assert!(result_source.schema_adapter_factory().is_some());
-
- // Create adapter and test it produces expected schema
- let adapter_factory = result_source.schema_adapter_factory().unwrap();
- let adapter = adapter_factory.create(schema.clone(), schema.clone());
-
- // Create a dummy batch to test the schema mapping
- let dummy_batch = RecordBatch::new_empty(schema.clone());
-
- // Get the file schema (which is the same as the table schema in this test)
- let (mapper, _) = adapter.map_schema(&schema).unwrap();
-
- // Apply the mapping to get the output schema
- let mapped_batch = mapper.map_batch(dummy_batch).unwrap();
- let output_schema = mapped_batch.schema();
-
- // Check the column names have the prefix
- assert_eq!(output_schema.field(0).name(), "test_id");
- assert_eq!(output_schema.field(1).name(), "test_name");
-}
-
-#[test]
-fn test_apply_schema_adapter_without_factory() {
- // Create a schema
- let schema = Arc::new(Schema::new(vec![
- Field::new("id", DataType::Int32, false),
- Field::new("name", DataType::Utf8, true),
- ]));
-
- // Create a parquet source
- let source = ParquetSource::new(schema.clone());
-
- // Convert to Arc<dyn FileSource>
- let file_source: Arc<dyn FileSource> = Arc::new(source.clone());
-
- // Create a file scan config without a schema adapter factory
- let config =
- FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(),
file_source)
- .build();
-
- // Apply schema adapter function - should pass through the source unchanged
- let result_source = source.apply_schema_adapter(&config).unwrap();
+ // c3 = 999 should match no rows
+ let batches = ctx
+ .sql("SELECT c1 FROM t WHERE c3 = 999")
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
- // Verify no adapter was applied
- assert!(result_source.schema_adapter_factory().is_none());
+ #[rustfmt::skip]
+ let expected = [
+ "++",
+ "++",
+ ];
+ assert_batches_eq!(expected, &batches);
}
diff --git
a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs
b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs
index 1915298164..f0d09d7134 100644
--- a/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs
+++ b/datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs
@@ -18,36 +18,66 @@
use std::sync::Arc;
use arrow::array::RecordBatch;
-use arrow_schema::{DataType, Field, Schema, SchemaRef};
+
+use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef};
use bytes::{BufMut, BytesMut};
use datafusion::common::Result;
+use datafusion::config::{ConfigOptions, TableParquetOptions};
use datafusion::datasource::listing::PartitionedFile;
+#[cfg(feature = "parquet")]
+use datafusion::datasource::physical_plan::ParquetSource;
use datafusion::datasource::physical_plan::{
- ArrowSource, CsvSource, FileSource, JsonSource, ParquetSource,
+ ArrowSource, CsvSource, FileSource, JsonSource,
};
+use datafusion::logical_expr::{col, lit};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use datafusion_common::config::CsvOptions;
-use datafusion_common::ColumnStatistics;
+use datafusion_common::record_batch;
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion_common::{ColumnStatistics, ScalarValue};
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::schema_adapter::{
SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
};
+
+use datafusion::assert_batches_eq;
use datafusion_datasource::source::DataSourceExec;
use datafusion_datasource::TableSchema;
use datafusion_execution::object_store::ObjectStoreUrl;
+use datafusion_expr::Expr;
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::planner::logical2physical;
+use datafusion_physical_expr::projection::ProjectionExprs;
+use datafusion_physical_expr_adapter::{PhysicalExprAdapter,
PhysicalExprAdapterFactory};
+use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use object_store::{memory::InMemory, path::Path, ObjectStore};
use parquet::arrow::ArrowWriter;
async fn write_parquet(batch: RecordBatch, store: Arc<dyn ObjectStore>, path:
&str) {
+ write_batches_to_parquet(&[batch], store, path).await;
+}
+
+/// Write RecordBatches to a Parquet file with each batch in its own row group.
+async fn write_batches_to_parquet(
+ batches: &[RecordBatch],
+ store: Arc<dyn ObjectStore>,
+ path: &str,
+) -> usize {
let mut out = BytesMut::new().writer();
{
- let mut writer = ArrowWriter::try_new(&mut out, batch.schema(),
None).unwrap();
- writer.write(&batch).unwrap();
+ let mut writer =
+ ArrowWriter::try_new(&mut out, batches[0].schema(), None).unwrap();
+ for batch in batches {
+ writer.write(batch).unwrap();
+ writer.flush().unwrap();
+ }
writer.finish().unwrap();
}
let data = out.into_inner().freeze();
+ let file_size = data.len();
store.put(&Path::from(path), data.into()).await.unwrap();
+ file_size
}
/// A schema adapter factory that transforms column names to uppercase
@@ -156,71 +186,414 @@ impl SchemaMapper for UppercaseSchemaMapper {
}
}
-#[cfg(feature = "parquet")]
-#[tokio::test]
-async fn test_parquet_integration_with_schema_adapter() -> Result<()> {
- // Create test data
- let batch = RecordBatch::try_new(
- Arc::new(Schema::new(vec![
- Field::new("id", DataType::Int32, false),
- Field::new("name", DataType::Utf8, true),
- ])),
- vec![
- Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])),
- Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c"])),
- ],
- )?;
+/// A physical expression adapter factory that maps uppercase column names to
lowercase
+#[derive(Debug)]
+struct UppercasePhysicalExprAdapterFactory;
- let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
- let store_url = ObjectStoreUrl::parse("memory://").unwrap();
- let path = "test.parquet";
- write_parquet(batch.clone(), store.clone(), path).await;
+impl PhysicalExprAdapterFactory for UppercasePhysicalExprAdapterFactory {
+ fn create(
+ &self,
+ logical_file_schema: SchemaRef,
+ physical_file_schema: SchemaRef,
+ ) -> Arc<dyn PhysicalExprAdapter> {
+ Arc::new(UppercasePhysicalExprAdapter {
+ logical_file_schema,
+ physical_file_schema,
+ })
+ }
+}
- // Get the actual file size from the object store
- let object_meta = store.head(&Path::from(path)).await?;
- let file_size = object_meta.size;
+#[derive(Debug)]
+struct UppercasePhysicalExprAdapter {
+ logical_file_schema: SchemaRef,
+ physical_file_schema: SchemaRef,
+}
- // Create a session context and register the object store
- let ctx = SessionContext::new();
- ctx.register_object_store(store_url.as_ref(), Arc::clone(&store));
+impl PhysicalExprAdapter for UppercasePhysicalExprAdapter {
+ fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn
PhysicalExpr>> {
+ expr.transform(|e| {
+ if let Some(column) = e.as_any().downcast_ref::<Column>() {
+ // Map uppercase column name (from logical schema) to
lowercase (in physical file)
+ let lowercase_name = column.name().to_lowercase();
+ if let Ok(idx) =
self.physical_file_schema.index_of(&lowercase_name) {
+ return Ok(Transformed::yes(
+ Arc::new(Column::new(&lowercase_name, idx))
+ as Arc<dyn PhysicalExpr>,
+ ));
+ }
+ }
+ Ok(Transformed::no(e))
+ })
+ .data()
+ }
- // Create a table schema with uppercase column names
- let table_schema = Arc::new(Schema::new(vec![
- Field::new("ID", DataType::Int32, false),
- Field::new("NAME", DataType::Utf8, true),
- ]));
+ fn with_partition_values(
+ &self,
+ _partition_values: Vec<(FieldRef, ScalarValue)>,
+ ) -> Arc<dyn PhysicalExprAdapter> {
+ Arc::new(Self {
+ logical_file_schema: self.logical_file_schema.clone(),
+ physical_file_schema: self.physical_file_schema.clone(),
+ })
+ }
+}
- // Create a ParquetSource with the adapter factory
- let file_source = ParquetSource::new(table_schema.clone())
- .with_schema_adapter_factory(Arc::new(UppercaseAdapterFactory {}))?;
+#[derive(Clone)]
+struct ParquetTestCase {
+ table_schema: TableSchema,
+ batches: Vec<RecordBatch>,
+ predicate: Option<Expr>,
+ projection: Option<ProjectionExprs>,
+ push_down_filters: bool,
+}
- let config = FileScanConfigBuilder::new(store_url, file_source)
- .with_file(PartitionedFile::new(path, file_size))
- .build();
+impl ParquetTestCase {
+ fn new(table_schema: TableSchema, batches: Vec<RecordBatch>) -> Self {
+ Self {
+ table_schema,
+ batches,
+ predicate: None,
+ projection: None,
+ push_down_filters: true,
+ }
+ }
- // Create a data source executor
- let exec = DataSourceExec::from_data_source(config);
+ fn push_down_filters(mut self, pushdown_filters: bool) -> Self {
+ self.push_down_filters = pushdown_filters;
+ self
+ }
- // Collect results
- let task_ctx = ctx.task_ctx();
- let stream = exec.execute(0, task_ctx)?;
- let batches = datafusion::physical_plan::common::collect(stream).await?;
+ fn with_predicate(mut self, predicate: Expr) -> Self {
+ self.predicate = Some(predicate);
+ self
+ }
- // There should be one batch
- assert_eq!(batches.len(), 1);
+ fn with_projection(mut self, projection: ProjectionExprs) -> Self {
+ self.projection = Some(projection);
+ self
+ }
- // Verify the schema has the uppercase column names
- let result_schema = batches[0].schema();
- assert_eq!(result_schema.field(0).name(), "ID");
- assert_eq!(result_schema.field(1).name(), "NAME");
+ async fn execute(self) -> Result<Vec<RecordBatch>> {
+ let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+ let store_url = ObjectStoreUrl::parse("memory://").unwrap();
+ let path = "test.parquet";
+ let file_size =
+ write_batches_to_parquet(&self.batches, store.clone(), path).await;
+
+ let ctx = SessionContext::new();
+ ctx.register_object_store(store_url.as_ref(), Arc::clone(&store));
+
+ let mut table_options = TableParquetOptions::default();
+ // controlled via ConfigOptions flag; ParquetSources ORs them so if
either is true then pushdown is enabled
+ table_options.global.pushdown_filters = false;
+ let mut file_source = Arc::new(
+ ParquetSource::new(self.table_schema.table_schema().clone())
+ .with_table_parquet_options(table_options),
+ ) as Arc<dyn FileSource>;
+
+ if let Some(projection) = self.projection {
+ file_source =
file_source.try_pushdown_projection(&projection)?.unwrap();
+ }
+
+ if let Some(predicate) = &self.predicate {
+ let filter_expr =
+ logical2physical(predicate, self.table_schema.table_schema());
+ let mut config = ConfigOptions::default();
+ config.execution.parquet.pushdown_filters = self.push_down_filters;
+ let result = file_source.try_pushdown_filters(vec![filter_expr],
&config)?;
+ file_source = result.updated_node.unwrap();
+ }
+
+ let config = FileScanConfigBuilder::new(store_url.clone(), file_source)
+ .with_file(PartitionedFile::new(path, file_size as u64)) // size 0
for test
+ .with_expr_adapter(None)
+ .build();
+
+ let exec = DataSourceExec::from_data_source(config);
+ let task_ctx = ctx.task_ctx();
+ let stream = exec.execute(0, task_ctx)?;
+ datafusion::physical_plan::common::collect(stream).await
+ }
+}
+
+/// Test reading and filtering a Parquet file where the table schema is
flipped (c, b, a) vs. the physical file schema (a, b, c)
+#[tokio::test]
+#[cfg(feature = "parquet")]
+async fn test_parquet_flipped_projection() -> Result<()> {
+ // Create test data with columns (a, b, c) - the file schema
+ let batch1 = record_batch!(
+ ("a", Int32, vec![1, 2]),
+ ("b", Utf8, vec!["x", "y"]),
+ ("c", Float64, vec![1.1, 2.2])
+ )?;
+ let batch2 = record_batch!(
+ ("a", Int32, vec![3]),
+ ("b", Utf8, vec!["z"]),
+ ("c", Float64, vec![3.3])
+ )?;
+
+ // Create a table schema with flipped column order (c, b, a)
+ let table_schema = Arc::new(Schema::new(vec![
+ Field::new("c", DataType::Float64, false),
+ Field::new("b", DataType::Utf8, true),
+ Field::new("a", DataType::Int32, false),
+ ]));
+ let table_schema = TableSchema::from_file_schema(table_schema);
+
+ let test_case = ParquetTestCase::new(table_schema.clone(), vec![batch1,
batch2]);
+
+ // Test reading with flipped schema
+ let batches = test_case.clone().execute().await?;
+ #[rustfmt::skip]
+ let expected = [
+ "+-----+---+---+",
+ "| c | b | a |",
+ "+-----+---+---+",
+ "| 1.1 | x | 1 |",
+ "| 2.2 | y | 2 |",
+ "| 3.3 | z | 3 |",
+ "+-----+---+---+",
+ ];
+ assert_batches_eq!(expected, &batches);
+
+ // Test with a projection that selects (b, a)
+ let projection = ProjectionExprs::from_indices(&[1, 2],
table_schema.table_schema());
+ let batches = test_case
+ .clone()
+ .with_projection(projection.clone())
+ .execute()
+ .await?;
+ #[rustfmt::skip]
+ let expected = [
+ "+---+---+",
+ "| b | a |",
+ "+---+---+",
+ "| x | 1 |",
+ "| y | 2 |",
+ "| z | 3 |",
+ "+---+---+",
+ ];
+ assert_batches_eq!(expected, &batches);
+
+ // Test with a filter on b, a
+ // a = 1 or b != 'foo' and a = 3 -> matches [{a=1,b=x},{b=z,a=3}]
+ let filter = col("a")
+ .eq(lit(1))
+ .or(col("b").not_eq(lit("foo")).and(col("a").eq(lit(3))));
+ let batches = test_case
+ .clone()
+ .with_projection(projection.clone())
+ .with_predicate(filter.clone())
+ .execute()
+ .await?;
+ #[rustfmt::skip]
+ let expected = [
+ "+---+---+",
+ "| b | a |",
+ "+---+---+",
+ "| x | 1 |",
+ "| z | 3 |",
+ "+---+---+",
+ ];
+ assert_batches_eq!(expected, &batches);
+
+ // Test with only statistics-based filter pushdown (no row-level filtering)
+ // Since we have 2 row groups and the filter matches rows in both, stats
pruning alone won't filter any
+ let batches = test_case
+ .clone()
+ .with_projection(projection)
+ .with_predicate(filter)
+ .push_down_filters(false)
+ .execute()
+ .await?;
+ #[rustfmt::skip]
+ let expected = [
+ "+---+---+",
+ "| b | a |",
+ "+---+---+",
+ "| x | 1 |",
+ "| y | 2 |",
+ "| z | 3 |",
+ "+---+---+",
+ ];
+ assert_batches_eq!(expected, &batches);
+
+ // Test with a filter that can prune via statistics: a > 10 (no rows match)
+ let filter = col("a").gt(lit(10));
+ let batches = test_case
+ .clone()
+ .with_predicate(filter)
+ .push_down_filters(false)
+ .execute()
+ .await?;
+ // Stats show a has max=3, so a > 10 prunes all row groups
+ assert_eq!(batches.len(), 0);
+
+ // With a filter that matches only the first row group: a < 3
+ let filter = col("a").lt(lit(3));
+ let batches = test_case
+ .clone()
+ .with_predicate(filter)
+ .push_down_filters(false)
+ .execute()
+ .await?;
+ #[rustfmt::skip]
+ let expected = [
+ "+-----+---+---+",
+ "| c | b | a |",
+ "+-----+---+---+",
+ "| 1.1 | x | 1 |",
+ "| 2.2 | y | 2 |",
+ "+-----+---+---+",
+ ];
+ assert_batches_eq!(expected, &batches);
Ok(())
}
+/// Test reading a Parquet file that is missing a column specified in the
table schema, which should get filled in with nulls by default.
+/// We test with the file having columns (a, c) and the table schema having
(a, b, c)
+#[tokio::test]
#[cfg(feature = "parquet")]
+async fn test_parquet_missing_column() -> Result<()> {
+ // Create test data with columns (a, c) as 2 batches
+ // | a | c |
+ // |---|-----|
+ // | 1 | 1.1 |
+ // | 2 | 2.2 |
+ // | ~ | ~~~ |
+ // | 3 | 3.3 |
+ let batch1 = record_batch!(("a", Int32, vec![1, 2]), ("c", Float64,
vec![1.1, 2.2]))?;
+ let batch2 = record_batch!(("a", Int32, vec![3]), ("c", Float64,
vec![3.3]))?;
+
+ // Create a table schema with an extra column 'b' (a, b, c)
+ let logical_file_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Utf8, true),
+ Field::new("c", DataType::Float64, false),
+ ]));
+ let table_schema =
TableSchema::from_file_schema(logical_file_schema.clone());
+
+ let test_case = ParquetTestCase::new(table_schema.clone(), vec![batch1,
batch2]);
+
+ let batches = test_case.clone().execute().await?;
+ #[rustfmt::skip]
+ let expected = [
+ "+---+---+-----+",
+ "| a | b | c |",
+ "+---+---+-----+",
+ "| 1 | | 1.1 |",
+ "| 2 | | 2.2 |",
+ "| 3 | | 3.3 |",
+ "+---+---+-----+",
+ ];
+ assert_batches_eq!(expected, &batches);
+
+ // And with a projection applied that selects (`c, `a`, `b`)
+ let projection =
+ ProjectionExprs::from_indices(&[2, 0, 1], table_schema.table_schema());
+ let batches = test_case
+ .clone()
+ .with_projection(projection)
+ .execute()
+ .await?;
+ #[rustfmt::skip]
+ let expected = [
+ "+-----+---+---+",
+ "| c | a | b |",
+ "+-----+---+---+",
+ "| 1.1 | 1 | |",
+ "| 2.2 | 2 | |",
+ "| 3.3 | 3 | |",
+ "+-----+---+---+",
+ ];
+ assert_batches_eq!(expected, &batches);
+
+ // And with a filter on a, b
+ // a = 1 or b is null and a = 3
+ let filter = col("a")
+ .eq(lit(1))
+ .or(col("b").is_null().and(col("a").eq(lit(3))));
+ let batches = test_case
+ .clone()
+ .with_predicate(filter.clone())
+ .execute()
+ .await?;
+ #[rustfmt::skip]
+ let expected = [
+ "+---+---+-----+",
+ "| a | b | c |",
+ "+---+---+-----+",
+ "| 1 | | 1.1 |",
+ "| 3 | | 3.3 |",
+ "+---+---+-----+",
+ ];
+ assert_batches_eq!(expected, &batches);
+ // With only statistics-based filter pushdown
+ let batches = test_case
+ .clone()
+ .with_predicate(filter)
+ .push_down_filters(false)
+ .execute()
+ .await?;
+ #[rustfmt::skip]
+ let expected = [
+ "+---+---+-----+",
+ "| a | b | c |",
+ "+---+---+-----+",
+ "| 1 | | 1.1 |",
+ "| 2 | | 2.2 |",
+ "| 3 | | 3.3 |",
+ "+---+---+-----+",
+ ];
+ assert_batches_eq!(expected, &batches);
+
+ // Filter `b is not null or a = 24` doesn't match any rows
+ let filter = col("b").is_not_null().or(col("a").eq(lit(24)));
+ let batches = test_case
+ .clone()
+ .with_predicate(filter.clone())
+ .execute()
+ .await?;
+ // There should be zero batches
+ assert_eq!(batches.len(), 0);
+ // With only statistics-based filter pushdown
+ let batches = test_case
+ .clone()
+ .with_predicate(filter)
+ .push_down_filters(false)
+ .execute()
+ .await?;
+ // There will be data: the filter is (null) is not null or a = 24.
+ // Statistics pruning doesn't handle `null is not null` so it resolves to
`true or a = 24` -> `true` so no row groups are pruned
+ #[rustfmt::skip]
+ let expected = [
+ "+---+---+-----+",
+ "| a | b | c |",
+ "+---+---+-----+",
+ "| 1 | | 1.1 |",
+ "| 2 | | 2.2 |",
+ "| 3 | | 3.3 |",
+ "+---+---+-----+",
+ ];
+ assert_batches_eq!(expected, &batches);
+ // On the other hand the filter `b = 'foo' and a = 24` should prune all
data even with only statistics-based pushdown
+ let filter = col("b").eq(lit("foo")).and(col("a").eq(lit(24)));
+ let batches = test_case
+ .clone()
+ .with_predicate(filter)
+ .push_down_filters(false)
+ .execute()
+ .await?;
+ // There should be zero batches
+ assert_eq!(batches.len(), 0);
+
+ Ok(())
+}
+
#[tokio::test]
-async fn test_parquet_integration_with_schema_adapter_and_expression_rewriter(
-) -> Result<()> {
+#[cfg(feature = "parquet")]
+async fn test_parquet_integration_with_physical_expr_adapter() -> Result<()> {
// Create test data
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![
@@ -246,12 +619,19 @@ async fn
test_parquet_integration_with_schema_adapter_and_expression_rewriter(
let ctx = SessionContext::new();
ctx.register_object_store(store_url.as_ref(), Arc::clone(&store));
- // Create a ParquetSource with the adapter factory
- let file_source = ParquetSource::new(batch.schema())
- .with_schema_adapter_factory(Arc::new(UppercaseAdapterFactory {}))?;
+ // Create a table schema with uppercase column names
+ let table_schema = Arc::new(Schema::new(vec![
+ Field::new("ID", DataType::Int32, false),
+ Field::new("NAME", DataType::Utf8, true),
+ ]));
+
+ // Create a ParquetSource with the table schema (uppercase columns)
+ let file_source = Arc::new(ParquetSource::new(table_schema.clone()));
+ // Use PhysicalExprAdapterFactory to map uppercase column names to
lowercase
let config = FileScanConfigBuilder::new(store_url, file_source)
.with_file(PartitionedFile::new(path, file_size))
+ .with_expr_adapter(Some(Arc::new(UppercasePhysicalExprAdapterFactory)))
.build();
// Create a data source executor
@@ -265,10 +645,28 @@ async fn
test_parquet_integration_with_schema_adapter_and_expression_rewriter(
// There should be one batch
assert_eq!(batches.len(), 1);
- // Verify the schema has the original column names (schema adapter not
applied in DataSourceExec)
+ // Verify the schema has the uppercase column names
let result_schema = batches[0].schema();
- assert_eq!(result_schema.field(0).name(), "id");
- assert_eq!(result_schema.field(1).name(), "name");
+ assert_eq!(result_schema.field(0).name(), "ID");
+ assert_eq!(result_schema.field(1).name(), "NAME");
+
+ // Verify the data was correctly read from the lowercase file columns
+ // This confirms the PhysicalExprAdapter successfully mapped uppercase ->
lowercase
+ let id_array = batches[0]
+ .column(0)
+ .as_any()
+ .downcast_ref::<arrow::array::Int32Array>()
+ .expect("Expected Int32Array for ID column");
+ assert_eq!(id_array.values(), &[1, 2, 3]);
+
+ let name_array = batches[0]
+ .column(1)
+ .as_any()
+ .downcast_ref::<arrow::array::StringArray>()
+ .expect("Expected StringArray for NAME column");
+ assert_eq!(name_array.value(0), "a");
+ assert_eq!(name_array.value(1), "b");
+ assert_eq!(name_array.value(2), "c");
Ok(())
}
@@ -306,28 +704,6 @@ async fn test_multi_source_schema_adapter_reuse() ->
Result<()> {
);
}
- // Test ParquetSource
- #[cfg(feature = "parquet")]
- {
- let schema =
- Arc::new(Schema::new(vec![Field::new("id", DataType::Int32,
false)]));
- let source = ParquetSource::new(schema);
- let source_with_adapter = source
- .clone()
- .with_schema_adapter_factory(factory.clone())
- .unwrap();
-
- let base_source: Arc<dyn FileSource> = source.into();
- assert!(base_source.schema_adapter_factory().is_none());
- assert!(source_with_adapter.schema_adapter_factory().is_some());
-
- let retrieved_factory =
source_with_adapter.schema_adapter_factory().unwrap();
- assert_eq!(
- format!("{:?}", retrieved_factory.as_ref()),
- format!("{:?}", factory.as_ref())
- );
- }
-
// Test CsvSource
{
let schema =
diff --git a/datafusion/datasource-parquet/src/file_format.rs
b/datafusion/datasource-parquet/src/file_format.rs
index a2ce16cd53..9cc061cc45 100644
--- a/datafusion/datasource-parquet/src/file_format.rs
+++ b/datafusion/datasource-parquet/src/file_format.rs
@@ -483,11 +483,8 @@ impl FileFormat for ParquetFormat {
source = self.set_source_encryption_factory(source, state)?;
- // Apply schema adapter factory before building the new config
- let file_source = source.apply_schema_adapter(&conf)?;
-
let conf = FileScanConfigBuilder::from(conf)
- .with_source(file_source)
+ .with_source(Arc::new(source))
.build();
Ok(DataSourceExec::from_data_source(conf))
}
diff --git a/datafusion/datasource-parquet/src/opener.rs
b/datafusion/datasource-parquet/src/opener.rs
index fd2907b86b..08534f8076 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -25,7 +25,8 @@ use crate::{
};
use arrow::array::RecordBatch;
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
-use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
+use datafusion_physical_expr::projection::ProjectionExprs;
+use datafusion_physical_expr::utils::reassign_expr_columns;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
@@ -97,8 +98,6 @@ pub(super) struct ParquetOpener {
/// Should the bloom filter be read from parquet, if present, to skip row
/// groups
pub enable_bloom_filter: bool,
- /// Schema adapter factory
- pub schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
/// Should row group pruning be applied
pub enable_row_group_stats_pruning: bool,
/// Coerce INT96 timestamps to specific TimeUnit
@@ -140,12 +139,7 @@ impl FileOpener for ParquetOpener {
let batch_size = self.batch_size;
- let projected_schema =
-
SchemaRef::from(self.logical_file_schema.project(&self.projection)?);
- let schema_adapter_factory = Arc::clone(&self.schema_adapter_factory);
- let schema_adapter = self
- .schema_adapter_factory
- .create(projected_schema, Arc::clone(&self.logical_file_schema));
+ let projection = Arc::clone(&self.projection);
let mut predicate = self.predicate.clone();
let logical_file_schema = Arc::clone(&self.logical_file_schema);
let partition_fields = self.partition_fields.clone();
@@ -269,7 +263,7 @@ impl FileOpener for ParquetOpener {
// Adapt the predicate to the physical file schema.
// This evaluates missing columns and inserts any necessary casts.
- if let Some(expr_adapter_factory) = expr_adapter_factory {
+ if let Some(expr_adapter_factory) = expr_adapter_factory.as_ref() {
predicate = predicate
.map(|p| {
let partition_values = partition_fields
@@ -320,13 +314,26 @@ impl FileOpener for ParquetOpener {
reader_metadata,
);
- let (schema_mapping, adapted_projections) =
- schema_adapter.map_schema(&physical_file_schema)?;
+ let mut projection =
+ ProjectionExprs::from_indices(&projection,
&logical_file_schema);
+ if let Some(expr_adapter_factory) = expr_adapter_factory {
+ let adapter = expr_adapter_factory
+ .create(
+ Arc::clone(&logical_file_schema),
+ Arc::clone(&physical_file_schema),
+ )
+ .with_partition_values(
+ partition_fields
+ .iter()
+ .cloned()
+ .zip(partitioned_file.partition_values.clone())
+ .collect_vec(),
+ );
+ projection = projection.try_map_exprs(|expr|
adapter.rewrite(expr))?;
+ }
+ let indices = projection.column_indices();
- let mask = ProjectionMask::roots(
- builder.parquet_schema(),
- adapted_projections.iter().cloned(),
- );
+ let mask = ProjectionMask::roots(builder.parquet_schema(),
indices);
// Filter pushdown: evaluate predicates during scan
if let Some(predicate) =
pushdown_filters.then_some(predicate).flatten() {
@@ -337,7 +344,6 @@ impl FileOpener for ParquetOpener {
builder.metadata(),
reorder_predicates,
&file_metrics,
- &schema_adapter_factory,
);
match row_filter {
@@ -464,6 +470,16 @@ impl FileOpener for ParquetOpener {
file_metrics.predicate_cache_inner_records.clone();
let predicate_cache_records =
file_metrics.predicate_cache_records.clone();
+ let stream_schema = Arc::clone(stream.schema());
+
+ // Rebase column indices to match the narrowed stream schema.
+ // The projection expressions have indices based on
physical_file_schema,
+ // but the stream only contains the columns selected by the
ProjectionMask.
+ let projection = projection
+ .try_map_exprs(|expr| reassign_expr_columns(expr,
&stream_schema))?;
+
+ let projector = projection.make_projector(&stream_schema)?;
+
let stream = stream.map_err(DataFusionError::from).map(move |b| {
b.and_then(|b| {
copy_arrow_reader_metrics(
@@ -471,7 +487,7 @@ impl FileOpener for ParquetOpener {
&predicate_cache_inner_records,
&predicate_cache_records,
);
- schema_mapping.map_batch(b)
+ projector.project_batch(&b)
})
});
@@ -764,29 +780,19 @@ fn should_enable_page_index(
mod test {
use std::sync::Arc;
- use arrow::{
- compute::cast,
- datatypes::{DataType, Field, Schema, SchemaRef},
- };
+ use arrow::datatypes::{DataType, Field, Schema};
use bytes::{BufMut, BytesMut};
use datafusion_common::{
- assert_batches_eq, record_batch, stats::Precision, ColumnStatistics,
- DataFusionError, ScalarValue, Statistics,
- };
- use datafusion_datasource::{
- file_stream::FileOpener,
- schema_adapter::{
- DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory,
- SchemaMapper,
- },
- PartitionedFile,
+ record_batch, stats::Precision, ColumnStatistics, DataFusionError,
ScalarValue,
+ Statistics,
};
+ use datafusion_datasource::{file_stream::FileOpener, PartitionedFile};
use datafusion_expr::{col, lit};
use datafusion_physical_expr::{
expressions::DynamicFilterPhysicalExpr, planner::logical2physical,
PhysicalExpr,
};
use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
- use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet,
MetricsSet};
+ use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::{Stream, StreamExt};
use object_store::{memory::InMemory, path::Path, ObjectStore};
use parquet::arrow::ArrowWriter;
@@ -810,21 +816,6 @@ mod test {
(num_batches, num_rows)
}
- async fn collect_batches(
- mut stream: std::pin::Pin<
- Box<
- dyn Stream<Item = Result<arrow::array::RecordBatch,
DataFusionError>>
- + Send,
- >,
- >,
- ) -> Vec<arrow::array::RecordBatch> {
- let mut batches = vec![];
- while let Some(Ok(batch)) = stream.next().await {
- batches.push(batch);
- }
- batches
- }
-
async fn write_parquet(
store: Arc<dyn ObjectStore>,
filename: &str,
@@ -898,7 +889,6 @@ mod test {
force_filter_selections: false,
enable_page_index: false,
enable_bloom_filter: false,
- schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: true,
coerce_int96: None,
#[cfg(feature = "parquet_encryption")]
@@ -972,7 +962,6 @@ mod test {
force_filter_selections: false,
enable_page_index: false,
enable_bloom_filter: false,
- schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: true,
coerce_int96: None,
#[cfg(feature = "parquet_encryption")]
@@ -1062,7 +1051,6 @@ mod test {
force_filter_selections: false,
enable_page_index: false,
enable_bloom_filter: false,
- schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: true,
coerce_int96: None,
#[cfg(feature = "parquet_encryption")]
@@ -1155,7 +1143,6 @@ mod test {
force_filter_selections: false,
enable_page_index: false,
enable_bloom_filter: false,
- schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: false, // note that this is
false!
coerce_int96: None,
#[cfg(feature = "parquet_encryption")]
@@ -1248,7 +1235,6 @@ mod test {
force_filter_selections: false,
enable_page_index: false,
enable_bloom_filter: false,
- schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: true,
coerce_int96: None,
#[cfg(feature = "parquet_encryption")]
@@ -1277,155 +1263,4 @@ mod test {
assert_eq!(num_batches, 0);
assert_eq!(num_rows, 0);
}
-
- fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize {
- match metrics.sum_by_name(metric_name) {
- Some(v) => v.as_usize(),
- _ => {
- panic!(
- "Expected metric not found. Looking for '{metric_name}'
in\n\n{metrics:#?}"
- );
- }
- }
- }
-
- #[tokio::test]
- async fn test_custom_schema_adapter_no_rewriter() {
- // Make a hardcoded schema adapter that adds a new column "b" with
default value 0.0
- // and converts the first column "a" from Int32 to UInt64.
- #[derive(Debug, Clone)]
- struct CustomSchemaMapper;
-
- impl SchemaMapper for CustomSchemaMapper {
- fn map_batch(
- &self,
- batch: arrow::array::RecordBatch,
- ) -> datafusion_common::Result<arrow::array::RecordBatch> {
- let a_column = cast(batch.column(0), &DataType::UInt64)?;
- // Add in a new column "b" with default value 0.0
- let b_column =
- arrow::array::Float64Array::from(vec![Some(0.0);
batch.num_rows()]);
- let columns = vec![a_column, Arc::new(b_column)];
- let new_schema = Arc::new(Schema::new(vec![
- Field::new("a", DataType::UInt64, false),
- Field::new("b", DataType::Float64, false),
- ]));
- Ok(arrow::record_batch::RecordBatch::try_new(
- new_schema, columns,
- )?)
- }
-
- fn map_column_statistics(
- &self,
- file_col_statistics: &[ColumnStatistics],
- ) -> datafusion_common::Result<Vec<ColumnStatistics>> {
- Ok(vec![
- file_col_statistics[0].clone(),
- ColumnStatistics::new_unknown(),
- ])
- }
- }
-
- #[derive(Debug, Clone)]
- struct CustomSchemaAdapter;
-
- impl SchemaAdapter for CustomSchemaAdapter {
- fn map_schema(
- &self,
- _file_schema: &Schema,
- ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>
- {
- let mapper = Arc::new(CustomSchemaMapper);
- let projection = vec![0]; // We only need to read the first
column "a" from the file
- Ok((mapper, projection))
- }
-
- fn map_column_index(
- &self,
- index: usize,
- file_schema: &Schema,
- ) -> Option<usize> {
- if index < file_schema.fields().len() {
- Some(index)
- } else {
- None // The new column "b" is not in the original schema
- }
- }
- }
-
- #[derive(Debug, Clone)]
- struct CustomSchemaAdapterFactory;
-
- impl SchemaAdapterFactory for CustomSchemaAdapterFactory {
- fn create(
- &self,
- _projected_table_schema: SchemaRef,
- _table_schema: SchemaRef,
- ) -> Box<dyn SchemaAdapter> {
- Box::new(CustomSchemaAdapter)
- }
- }
-
- // Test that if no expression rewriter is provided we use a
schemaadapter to adapt the data to the expression
- let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
- let batch = record_batch!(("a", Int32, vec![Some(1), Some(2),
Some(3)])).unwrap();
- // Write out the batch to a Parquet file
- let data_size =
- write_parquet(Arc::clone(&store), "test.parquet",
batch.clone()).await;
- let file = PartitionedFile::new(
- "test.parquet".to_string(),
- u64::try_from(data_size).unwrap(),
- );
- let table_schema = Arc::new(Schema::new(vec![
- Field::new("a", DataType::UInt64, false),
- Field::new("b", DataType::Float64, false),
- ]));
-
- let make_opener = |predicate| ParquetOpener {
- partition_index: 0,
- projection: Arc::new([0, 1]),
- batch_size: 1024,
- limit: None,
- predicate: Some(predicate),
- logical_file_schema: Arc::clone(&table_schema),
- metadata_size_hint: None,
- metrics: ExecutionPlanMetricsSet::new(),
- parquet_file_reader_factory:
Arc::new(DefaultParquetFileReaderFactory::new(
- Arc::clone(&store),
- )),
- partition_fields: vec![],
- pushdown_filters: true,
- reorder_filters: false,
- force_filter_selections: false,
- enable_page_index: false,
- enable_bloom_filter: false,
- schema_adapter_factory: Arc::new(CustomSchemaAdapterFactory),
- enable_row_group_stats_pruning: false,
- coerce_int96: None,
- #[cfg(feature = "parquet_encryption")]
- file_decryption_properties: None,
- expr_adapter_factory: None,
- #[cfg(feature = "parquet_encryption")]
- encryption_factory: None,
- max_predicate_cache_size: None,
- };
-
- let predicate = logical2physical(&col("a").eq(lit(1u64)),
&table_schema);
- let opener = make_opener(predicate);
- let stream = opener.open(file.clone()).unwrap().await.unwrap();
- let batches = collect_batches(stream).await;
-
- #[rustfmt::skip]
- let expected = [
- "+---+-----+",
- "| a | b |",
- "+---+-----+",
- "| 1 | 0.0 |",
- "+---+-----+",
- ];
- assert_batches_eq!(expected, &batches);
- let metrics = opener.metrics.clone_inner();
- assert_eq!(get_value(&metrics, "row_groups_pruned_statistics"), 0);
- assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 2);
- }
}
diff --git a/datafusion/datasource-parquet/src/row_filter.rs
b/datafusion/datasource-parquet/src/row_filter.rs
index de9fe181f8..059663c231 100644
--- a/datafusion/datasource-parquet/src/row_filter.rs
+++ b/datafusion/datasource-parquet/src/row_filter.rs
@@ -73,8 +73,7 @@ use parquet::file::metadata::ParquetMetaData;
use datafusion_common::cast::as_boolean_array;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion,
TreeNodeVisitor};
-use datafusion_common::Result;
-use datafusion_datasource::schema_adapter::{SchemaAdapterFactory,
SchemaMapper};
+use datafusion_common::{internal_datafusion_err, Result};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::utils::reassign_expr_columns;
use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
@@ -106,8 +105,6 @@ pub(crate) struct DatafusionArrowPredicate {
rows_matched: metrics::Count,
/// how long was spent evaluating this predicate
time: metrics::Time,
- /// used to perform type coercion while filtering rows
- schema_mapper: Arc<dyn SchemaMapper>,
}
impl DatafusionArrowPredicate {
@@ -131,7 +128,6 @@ impl DatafusionArrowPredicate {
rows_pruned,
rows_matched,
time,
- schema_mapper: candidate.schema_mapper,
})
}
}
@@ -142,8 +138,6 @@ impl ArrowPredicate for DatafusionArrowPredicate {
}
fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> {
- let batch = self.schema_mapper.map_batch(batch)?;
-
// scoped timer updates on drop
let mut timer = self.time.timer();
@@ -183,12 +177,8 @@ pub(crate) struct FilterCandidate {
/// Can this filter use an index (e.g. a page index) to prune rows?
can_use_index: bool,
/// The projection to read from the file schema to get the columns
- /// required to pass through a `SchemaMapper` to the table schema
- /// upon which we then evaluate the filter expression.
+ /// required to evaluate the filter expression.
projection: Vec<usize>,
- /// A `SchemaMapper` used to map batches read from the file schema to
- /// the filter's projection of the table schema.
- schema_mapper: Arc<dyn SchemaMapper>,
/// The projected table schema that this filter references
filter_schema: SchemaRef,
}
@@ -198,42 +188,17 @@ pub(crate) struct FilterCandidate {
/// This will do several things
/// 1. Determine the columns required to evaluate the expression
/// 2. Calculate data required to estimate the cost of evaluating the filter
-/// 3. Rewrite column expressions in the predicate which reference columns not
-/// in the particular file schema.
-///
-/// # Schema Rewrite
-///
-/// When parquet files are read in the context of "schema evolution" there are
-/// potentially wo schemas:
-///
-/// 1. The table schema (the columns of the table that the parquet file is
part of)
-/// 2. The file schema (the columns actually in the parquet file)
///
-/// There are times when the table schema contains columns that are not in the
-/// file schema, such as when new columns have been added in new parquet files
-/// but old files do not have the columns.
-///
-/// When a file is missing a column from the table schema, the value of the
-/// missing column is filled in by a `SchemaAdapter` (by default as `NULL`).
-///
-/// When a predicate is pushed down to the parquet reader, the predicate is
-/// evaluated in the context of the file schema.
-/// For each predicate we build a filter schema which is the projection of the
table
-/// schema that contains only the columns that this filter references.
-/// If any columns from the file schema are missing from a particular file
they are
-/// added by the `SchemaAdapter`, by default as `NULL`.
+/// Note that this does *not* handle any adaptation of the data schema to the
expression schema,
+/// it is assumed that the expression has already been adapted to the file
schema before being passed in here,
+/// generally using
[`PhysicalExprAdapter`](datafusion_physical_expr_adapter::PhysicalExprAdapter).
struct FilterCandidateBuilder {
expr: Arc<dyn PhysicalExpr>,
/// The schema of this parquet file.
- /// Columns may have different types from the table schema and there may be
- /// columns in the file schema that are not in the table schema or columns
that
- /// are in the table schema that are not in the file schema.
file_schema: SchemaRef,
/// The schema of the table (merged schema) -- columns may be in different
/// order than in the file and have columns that are not in the file schema
table_schema: SchemaRef,
- /// A `SchemaAdapterFactory` used to map the file schema to the table
schema.
- schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
}
impl FilterCandidateBuilder {
@@ -241,13 +206,11 @@ impl FilterCandidateBuilder {
expr: Arc<dyn PhysicalExpr>,
file_schema: Arc<Schema>,
table_schema: Arc<Schema>,
- schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
) -> Self {
Self {
expr,
file_schema,
table_schema,
- schema_adapter_factory,
}
}
@@ -270,10 +233,24 @@ impl FilterCandidateBuilder {
.project(&required_indices_into_table_schema)?,
);
- let (schema_mapper, projection_into_file_schema) = self
- .schema_adapter_factory
- .create(Arc::clone(&projected_table_schema), self.table_schema)
- .map_schema(&self.file_schema)?;
+ // Compute the projection into the file schema by matching column names
+ let mut projection_into_file_schema: Vec<usize> =
projected_table_schema
+ .fields()
+ .iter()
+ .filter_map(|f| self.file_schema.index_of(f.name()).ok())
+ .collect();
+ // Sort and remove duplicates
+ let original_len = projection_into_file_schema.len();
+ projection_into_file_schema.sort_unstable();
+ projection_into_file_schema.dedup();
+ if projection_into_file_schema.len() < original_len {
+ // This should not happen, as we built projected_table_schema from
+ // the table schema which should not have duplicate column names.
+ return Err(internal_datafusion_err!(
+ "Duplicate column names found when building filter candidate:
{:?}",
+ projection_into_file_schema
+ ));
+ }
let required_bytes = size_of_columns(&projection_into_file_schema,
metadata)?;
let can_use_index = columns_sorted(&projection_into_file_schema,
metadata)?;
@@ -283,7 +260,6 @@ impl FilterCandidateBuilder {
required_bytes,
can_use_index,
projection: projection_into_file_schema,
- schema_mapper: Arc::clone(&schema_mapper),
filter_schema: Arc::clone(&projected_table_schema),
}))
}
@@ -429,7 +405,6 @@ pub fn build_row_filter(
metadata: &ParquetMetaData,
reorder_predicates: bool,
file_metrics: &ParquetFileMetrics,
- schema_adapter_factory: &Arc<dyn SchemaAdapterFactory>,
) -> Result<Option<RowFilter>> {
let rows_pruned = &file_metrics.pushdown_rows_pruned;
let rows_matched = &file_metrics.pushdown_rows_matched;
@@ -447,7 +422,6 @@ pub fn build_row_filter(
Arc::clone(expr),
Arc::clone(physical_file_schema),
Arc::clone(predicate_file_schema),
- Arc::clone(schema_adapter_factory),
)
.build(metadata)
})
@@ -511,9 +485,11 @@ mod test {
use datafusion_common::ScalarValue;
use arrow::datatypes::{Field, TimeUnit::Nanosecond};
- use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory;
use datafusion_expr::{col, Expr};
use datafusion_physical_expr::planner::logical2physical;
+ use datafusion_physical_expr_adapter::{
+ DefaultPhysicalExprAdapterFactory, PhysicalExprAdapterFactory,
+ };
use datafusion_physical_plan::metrics::{Count, Time};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
@@ -538,17 +514,12 @@ mod test {
let expr = col("int64_list").is_not_null();
let expr = logical2physical(&expr, &table_schema);
- let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
let table_schema = Arc::new(table_schema.clone());
- let candidate = FilterCandidateBuilder::new(
- expr,
- table_schema.clone(),
- table_schema,
- schema_adapter_factory,
- )
- .build(metadata)
- .expect("building candidate");
+ let candidate =
+ FilterCandidateBuilder::new(expr, table_schema.clone(),
table_schema)
+ .build(metadata)
+ .expect("building candidate");
assert!(candidate.is_none());
}
@@ -578,17 +549,16 @@ mod test {
None,
));
let expr = logical2physical(&expr, &table_schema);
- let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
+ let expr = DefaultPhysicalExprAdapterFactory {}
+ .create(Arc::new(table_schema.clone()), Arc::clone(&file_schema))
+ .rewrite(expr)
+ .expect("rewriting expression");
let table_schema = Arc::new(table_schema.clone());
- let candidate = FilterCandidateBuilder::new(
- expr,
- file_schema.clone(),
- table_schema.clone(),
- schema_adapter_factory,
- )
- .build(&metadata)
- .expect("building candidate")
- .expect("candidate expected");
+ let candidate =
+ FilterCandidateBuilder::new(expr, file_schema.clone(),
table_schema.clone())
+ .build(&metadata)
+ .expect("building candidate")
+ .expect("candidate expected");
let mut row_filter = DatafusionArrowPredicate::try_new(
candidate,
@@ -619,16 +589,15 @@ mod test {
None,
));
let expr = logical2physical(&expr, &table_schema);
- let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
- let candidate = FilterCandidateBuilder::new(
- expr,
- file_schema,
- table_schema,
- schema_adapter_factory,
- )
- .build(&metadata)
- .expect("building candidate")
- .expect("candidate expected");
+ // Rewrite the expression to add CastExpr for type coercion
+ let expr = DefaultPhysicalExprAdapterFactory {}
+ .create(table_schema.clone(), Arc::clone(&file_schema))
+ .rewrite(expr)
+ .expect("rewriting expression");
+ let candidate = FilterCandidateBuilder::new(expr, file_schema,
table_schema)
+ .build(&metadata)
+ .expect("building candidate")
+ .expect("candidate expected");
let mut row_filter = DatafusionArrowPredicate::try_new(
candidate,
diff --git a/datafusion/datasource-parquet/src/source.rs
b/datafusion/datasource-parquet/src/source.rs
index 940cf32901..1abe126369 100644
--- a/datafusion/datasource-parquet/src/source.rs
+++ b/datafusion/datasource-parquet/src/source.rs
@@ -32,9 +32,6 @@ use datafusion_common::config::EncryptionFactoryOptions;
use datafusion_datasource::as_file_source;
use datafusion_datasource::file_stream::FileOpener;
use datafusion_datasource::projection::{ProjectionOpener, SplitProjection};
-use datafusion_datasource::schema_adapter::{
- DefaultSchemaAdapterFactory, SchemaAdapterFactory,
-};
use arrow::datatypes::TimeUnit;
use datafusion_common::config::TableParquetOptions;
@@ -135,7 +132,7 @@ use parquet::encryption::decrypt::FileDecryptionProperties;
/// details.
///
/// * Schema evolution: read parquet files with different schemas into a
unified
-/// table schema. See [`SchemaAdapterFactory`] for more details.
+/// table schema. See [`DefaultPhysicalExprAdapterFactory`] for more details.
///
/// * metadata_size_hint: controls the number of bytes read from the end of the
/// file in the initial I/O when the default [`ParquetFileReaderFactory`].
If a
@@ -262,12 +259,13 @@ use
parquet::encryption::decrypt::FileDecryptionProperties;
/// [`Self::with_pushdown_filters`]).
///
/// * Step 5: As each [`RecordBatch`] is read, it may be adapted by a
-/// [`SchemaAdapter`] to match the table schema. By default missing columns
are
-/// filled with nulls, but this can be customized via
[`SchemaAdapterFactory`].
+/// [`DefaultPhysicalExprAdapterFactory`] to match the table schema. By
default missing columns are
+/// filled with nulls, but this can be customized via
[`PhysicalExprAdapterFactory`].
///
/// [`RecordBatch`]: arrow::record_batch::RecordBatch
/// [`SchemaAdapter`]: datafusion_datasource::schema_adapter::SchemaAdapter
/// [`ParquetMetadata`]: parquet::file::metadata::ParquetMetaData
+/// [`PhysicalExprAdapterFactory`]:
datafusion_physical_expr_adapter::PhysicalExprAdapterFactory
#[derive(Clone, Debug)]
pub struct ParquetSource {
/// Options for reading Parquet files
@@ -282,8 +280,6 @@ pub struct ParquetSource {
pub(crate) predicate: Option<Arc<dyn PhysicalExpr>>,
/// Optional user defined parquet file reader factory
pub(crate) parquet_file_reader_factory: Option<Arc<dyn
ParquetFileReaderFactory>>,
- /// Optional user defined schema adapter
- pub(crate) schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
/// Batch size configuration
pub(crate) batch_size: Option<usize>,
/// Optional hint for the size of the parquet metadata
@@ -309,7 +305,6 @@ impl ParquetSource {
metrics: ExecutionPlanMetricsSet::new(),
predicate: None,
parquet_file_reader_factory: None,
- schema_adapter_factory: None,
batch_size: None,
metadata_size_hint: None,
#[cfg(feature = "parquet_encryption")]
@@ -456,28 +451,6 @@ impl ParquetSource {
self.table_parquet_options.global.max_predicate_cache_size
}
- /// Applies schema adapter factory from the FileScanConfig if present.
- ///
- /// # Arguments
- /// * `conf` - FileScanConfig that may contain a schema adapter factory
- /// # Returns
- /// The converted FileSource with schema adapter factory applied if
provided
- pub fn apply_schema_adapter(
- self,
- conf: &FileScanConfig,
- ) -> datafusion_common::Result<Arc<dyn FileSource>> {
- let file_source: Arc<dyn FileSource> = self.into();
-
- // If the FileScanConfig.file_source() has a schema adapter factory,
apply it
- if let Some(factory) = conf.file_source().schema_adapter_factory() {
- file_source.with_schema_adapter_factory(
- Arc::<dyn SchemaAdapterFactory>::clone(&factory),
- )
- } else {
- Ok(file_source)
- }
- }
-
#[cfg(feature = "parquet_encryption")]
fn get_encryption_factory_with_config(
&self,
@@ -526,43 +499,10 @@ impl FileSource for ParquetSource {
) -> datafusion_common::Result<Arc<dyn FileOpener>> {
let split_projection = self.projection.clone();
- let (expr_adapter_factory, schema_adapter_factory) = match (
- base_config.expr_adapter_factory.as_ref(),
- self.schema_adapter_factory.as_ref(),
- ) {
- (Some(expr_adapter_factory), Some(schema_adapter_factory)) => {
- // Use both the schema adapter factory and the expr adapter
factory.
- // This results in the SchemaAdapter being used for
projections (e.g. a column was selected that is a UInt32 in the file and a
UInt64 in the table schema)
- // but the PhysicalExprAdapterFactory being used for predicate
pushdown and stats pruning.
- (
- Some(Arc::clone(expr_adapter_factory)),
- Arc::clone(schema_adapter_factory),
- )
- }
- (Some(expr_adapter_factory), None) => {
- // If no custom schema adapter factory is provided but an expr
adapter factory is provided use the expr adapter factory alongside the default
schema adapter factory.
- // This means that the PhysicalExprAdapterFactory will be used
for predicate pushdown and stats pruning, while the default schema adapter
factory will be used for projections.
- (
- Some(Arc::clone(expr_adapter_factory)),
- Arc::new(DefaultSchemaAdapterFactory) as _,
- )
- }
- (None, Some(schema_adapter_factory)) => {
- // If a custom schema adapter factory is provided but no expr
adapter factory is provided use the custom SchemaAdapter for both projections
and predicate pushdown.
- // This maximizes compatibility with existing code that uses
the SchemaAdapter API and did not explicitly opt into the
PhysicalExprAdapterFactory API.
- (None, Arc::clone(schema_adapter_factory) as _)
- }
- (None, None) => {
- // If no custom schema adapter factory or expr adapter factory
is provided, use the default schema adapter factory and the default physical
expr adapter factory.
- // This means that the default SchemaAdapter will be used for
projections (e.g. a column was selected that is a UInt32 in the file and a
UInt64 in the table schema)
- // and the default PhysicalExprAdapterFactory will be used for
predicate pushdown and stats pruning.
- // This is the default behavior with not customization and
means that most users of DataFusion will be cut over to the new
PhysicalExprAdapterFactory API.
- (
- Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
- Arc::new(DefaultSchemaAdapterFactory) as _,
- )
- }
- };
+ let expr_adapter_factory = base_config
+ .expr_adapter_factory
+ .clone()
+ .or_else(|| Some(Arc::new(DefaultPhysicalExprAdapterFactory) as
_));
let parquet_file_reader_factory =
self.parquet_file_reader_factory.clone().unwrap_or_else(|| {
@@ -604,7 +544,6 @@ impl FileSource for ParquetSource {
enable_page_index: self.enable_page_index(),
enable_bloom_filter: self.bloom_filter_on_read(),
enable_row_group_stats_pruning:
self.table_parquet_options.global.pruning,
- schema_adapter_factory,
coerce_int96,
#[cfg(feature = "parquet_encryption")]
file_decryption_properties,
@@ -780,20 +719,6 @@ impl FileSource for ParquetSource {
)
.with_updated_node(source))
}
-
- fn with_schema_adapter_factory(
- &self,
- schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
- ) -> datafusion_common::Result<Arc<dyn FileSource>> {
- Ok(Arc::new(Self {
- schema_adapter_factory: Some(schema_adapter_factory),
- ..self.clone()
- }))
- }
-
- fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
- self.schema_adapter_factory.clone()
- }
}
#[cfg(test)]
diff --git a/datafusion/physical-expr-adapter/src/schema_rewriter.rs
b/datafusion/physical-expr-adapter/src/schema_rewriter.rs
index 61cc97dae3..daa4e6203c 100644
--- a/datafusion/physical-expr-adapter/src/schema_rewriter.rs
+++ b/datafusion/physical-expr-adapter/src/schema_rewriter.rs
@@ -23,12 +23,14 @@ use arrow::compute::can_cast_types;
use arrow::datatypes::{DataType, FieldRef, Schema, SchemaRef};
use datafusion_common::{
exec_err,
+ nested_struct::validate_struct_compatibility,
tree_node::{Transformed, TransformedResult, TreeNode},
Result, ScalarValue,
};
use datafusion_functions::core::getfield::GetFieldFunc;
+use datafusion_physical_expr::expressions::CastColumnExpr;
use datafusion_physical_expr::{
- expressions::{self, CastExpr, Column},
+ expressions::{self, Column},
ScalarFunctionExpr,
};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
@@ -380,10 +382,8 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> {
column.name()
);
}
- // If the column is missing from the physical schema fill
it in with nulls as `SchemaAdapter` would do.
- // TODO: do we need to sync this with what the
`SchemaAdapter` actually does?
- // While the default implementation fills in nulls in
theory a custom `SchemaAdapter` could do something else!
- // See https://github.com/apache/datafusion/issues/16527
+ // If the column is missing from the physical schema fill
it in with nulls as `SchemaAdapter` used to do.
+ // If users want a different behavior they need to provide
a custom `PhysicalExprAdapter` implementation.
let null_value =
ScalarValue::Null.cast_to(logical_field.data_type())?;
return Ok(Transformed::yes(expressions::lit(null_value)));
@@ -413,20 +413,34 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> {
// TODO: add optimization to move the cast from the column to literal
expressions in the case of `col = 123`
// since that's much cheaper to evalaute.
// See
https://github.com/apache/datafusion/issues/15780#issuecomment-2824716928
- let is_compatible =
- can_cast_types(physical_field.data_type(),
logical_field.data_type());
- if !is_compatible {
- return exec_err!(
- "Cannot cast column '{}' from '{}' (physical data type) to
'{}' (logical data type)",
- column.name(),
- physical_field.data_type(),
- logical_field.data_type()
- );
+ //
+ // For struct types, use validate_struct_compatibility which handles:
+ // - Missing fields in source (filled with nulls)
+ // - Extra fields in source (ignored)
+ // - Recursive validation of nested structs
+ // For non-struct types, use Arrow's can_cast_types
+ match (physical_field.data_type(), logical_field.data_type()) {
+ (DataType::Struct(physical_fields),
DataType::Struct(logical_fields)) => {
+ validate_struct_compatibility(physical_fields,
logical_fields)?;
+ }
+ _ => {
+ let is_compatible =
+ can_cast_types(physical_field.data_type(),
logical_field.data_type());
+ if !is_compatible {
+ return exec_err!(
+ "Cannot cast column '{}' from '{}' (physical data
type) to '{}' (logical data type)",
+ column.name(),
+ physical_field.data_type(),
+ logical_field.data_type()
+ );
+ }
+ }
}
- let cast_expr = Arc::new(CastExpr::new(
+ let cast_expr = Arc::new(CastColumnExpr::new(
Arc::new(column),
- logical_field.data_type().clone(),
+ Arc::new(physical_field.clone()),
+ Arc::new(logical_field.clone()),
None,
));
@@ -444,11 +458,14 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> {
#[cfg(test)]
mod tests {
use super::*;
- use arrow::array::{RecordBatch, RecordBatchOptions};
- use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+ use arrow::array::{
+ BooleanArray, Int32Array, Int64Array, RecordBatch, RecordBatchOptions,
+ StringArray, StringViewArray, StructArray,
+ };
+ use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
use datafusion_common::{assert_contains, record_batch, Result,
ScalarValue};
use datafusion_expr::Operator;
- use datafusion_physical_expr::expressions::{col, lit, CastExpr, Column,
Literal};
+ use datafusion_physical_expr::expressions::{col, lit, Column, Literal};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use itertools::Itertools;
use std::sync::Arc;
@@ -479,7 +496,7 @@ mod tests {
let result = adapter.rewrite(column_expr).unwrap();
// Should be wrapped in a cast expression
- assert!(result.as_any().downcast_ref::<CastExpr>().is_some());
+ assert!(result.as_any().downcast_ref::<CastColumnExpr>().is_some());
}
#[test]
@@ -510,9 +527,10 @@ mod tests {
println!("Rewritten expression: {result}");
let expected = expressions::BinaryExpr::new(
- Arc::new(CastExpr::new(
+ Arc::new(CastColumnExpr::new(
Arc::new(Column::new("a", 0)),
- DataType::Int64,
+ Arc::new(Field::new("a", DataType::Int32, false)),
+ Arc::new(Field::new("a", DataType::Int64, false)),
None,
)),
Operator::Plus,
@@ -554,7 +572,11 @@ mod tests {
let column_expr = Arc::new(Column::new("data", 0));
let error_msg = adapter.rewrite(column_expr).unwrap_err().to_string();
- assert_contains!(error_msg, "Cannot cast column 'data'");
+ // validate_struct_compatibility provides more specific error about
which field can't be cast
+ assert_contains!(
+ error_msg,
+ "Cannot cast struct field 'field1' from type Binary to type Int32"
+ );
}
#[test]
@@ -589,15 +611,30 @@ mod tests {
let result = adapter.rewrite(column_expr).unwrap();
- let expected = Arc::new(CastExpr::new(
+ let expected = Arc::new(CastColumnExpr::new(
Arc::new(Column::new("data", 0)),
- DataType::Struct(
- vec![
- Field::new("id", DataType::Int64, false),
- Field::new("name", DataType::Utf8View, true),
- ]
- .into(),
- ),
+ Arc::new(Field::new(
+ "data",
+ DataType::Struct(
+ vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("name", DataType::Utf8, true),
+ ]
+ .into(),
+ ),
+ false,
+ )),
+ Arc::new(Field::new(
+ "data",
+ DataType::Struct(
+ vec![
+ Field::new("id", DataType::Int64, false),
+ Field::new("name", DataType::Utf8View, true),
+ ]
+ .into(),
+ ),
+ false,
+ )),
None,
)) as Arc<dyn PhysicalExpr>;
@@ -821,6 +858,118 @@ mod tests {
);
}
+ /// Test that struct columns are properly adapted including:
+ /// - Type casting of subfields (Int32 -> Int64, Utf8 -> Utf8View)
+ /// - Missing fields in logical schema are filled with nulls
+ #[test]
+ fn test_adapt_struct_batches() {
+ // Physical struct: {id: Int32, name: Utf8}
+ let physical_struct_fields: Fields = vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("name", DataType::Utf8, true),
+ ]
+ .into();
+
+ let struct_array = StructArray::new(
+ physical_struct_fields.clone(),
+ vec![
+ Arc::new(Int32Array::from(vec![1, 2, 3])) as _,
+ Arc::new(StringArray::from(vec![
+ Some("alice"),
+ None,
+ Some("charlie"),
+ ])) as _,
+ ],
+ None,
+ );
+
+ let physical_schema = Arc::new(Schema::new(vec![Field::new(
+ "data",
+ DataType::Struct(physical_struct_fields),
+ false,
+ )]));
+
+ let physical_batch = RecordBatch::try_new(
+ Arc::clone(&physical_schema),
+ vec![Arc::new(struct_array)],
+ )
+ .unwrap();
+
+ // Logical struct: {id: Int64, name: Utf8View, extra: Boolean}
+ // - id: cast from Int32 to Int64
+ // - name: cast from Utf8 to Utf8View
+ // - extra: missing from physical, should be filled with nulls
+ let logical_struct_fields: Fields = vec![
+ Field::new("id", DataType::Int64, false),
+ Field::new("name", DataType::Utf8View, true),
+ Field::new("extra", DataType::Boolean, true), // New field, not in
physical
+ ]
+ .into();
+
+ let logical_schema = Arc::new(Schema::new(vec![Field::new(
+ "data",
+ DataType::Struct(logical_struct_fields),
+ false,
+ )]));
+
+ let projection = vec![col("data", &logical_schema).unwrap()];
+
+ let factory = DefaultPhysicalExprAdapterFactory;
+ let adapter =
+ factory.create(Arc::clone(&logical_schema),
Arc::clone(&physical_schema));
+
+ let adapted_projection = projection
+ .into_iter()
+ .map(|expr| adapter.rewrite(expr).unwrap())
+ .collect_vec();
+
+ let adapted_schema = Arc::new(Schema::new(
+ adapted_projection
+ .iter()
+ .map(|expr| expr.return_field(&physical_schema).unwrap())
+ .collect_vec(),
+ ));
+
+ let res = batch_project(
+ adapted_projection,
+ &physical_batch,
+ Arc::clone(&adapted_schema),
+ )
+ .unwrap();
+
+ assert_eq!(res.num_columns(), 1);
+
+ let result_struct = res
+ .column(0)
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .unwrap();
+
+ // Verify id field is cast to Int64
+ let id_col = result_struct.column_by_name("id").unwrap();
+ assert_eq!(id_col.data_type(), &DataType::Int64);
+ let id_values = id_col.as_any().downcast_ref::<Int64Array>().unwrap();
+ assert_eq!(
+ id_values.iter().collect_vec(),
+ vec![Some(1), Some(2), Some(3)]
+ );
+
+ // Verify name field is cast to Utf8View
+ let name_col = result_struct.column_by_name("name").unwrap();
+ assert_eq!(name_col.data_type(), &DataType::Utf8View);
+ let name_values =
name_col.as_any().downcast_ref::<StringViewArray>().unwrap();
+ assert_eq!(
+ name_values.iter().collect_vec(),
+ vec![Some("alice"), None, Some("charlie")]
+ );
+
+ // Verify extra field (missing from physical) is filled with nulls
+ let extra_col = result_struct.column_by_name("extra").unwrap();
+ assert_eq!(extra_col.data_type(), &DataType::Boolean);
+ let extra_values =
extra_col.as_any().downcast_ref::<BooleanArray>().unwrap();
+ assert_eq!(extra_values.iter().collect_vec(), vec![None, None, None]);
+ }
+
#[test]
fn test_try_rewrite_struct_field_access() {
// Test the core logic of try_rewrite_struct_field_access
diff --git a/datafusion/physical-expr/src/expressions/cast.rs
b/datafusion/physical-expr/src/expressions/cast.rs
index 0419161b53..a368aafbc6 100644
--- a/datafusion/physical-expr/src/expressions/cast.rs
+++ b/datafusion/physical-expr/src/expressions/cast.rs
@@ -740,6 +740,9 @@ mod tests {
Ok(())
}
+ // Tests for timestamp timezone casting have been moved to timestamps.slt
+ // See the "Casting between timestamp with and without timezone" section
+
#[test]
fn invalid_cast() {
// Ensure a useful error happens at plan time if invalid casts are used
diff --git a/datafusion/physical-expr/src/projection.rs
b/datafusion/physical-expr/src/projection.rs
index 3d6740510b..4688ac0e1b 100644
--- a/datafusion/physical-expr/src/projection.rs
+++ b/datafusion/physical-expr/src/projection.rs
@@ -240,6 +240,49 @@ impl ProjectionExprs {
self.exprs.iter().map(|e| Arc::clone(&e.expr))
}
+ /// Apply a fallible transformation to the [`PhysicalExpr`] of each
projection.
+ ///
+ /// This method transforms the expression in each [`ProjectionExpr`] while
preserving
+ /// the alias. This is useful for rewriting expressions, such as when
adapting
+ /// expressions to a different schema.
+ ///
+ /// # Example
+ ///
+ /// ```rust
+ /// use std::sync::Arc;
+ /// use arrow::datatypes::{DataType, Field, Schema};
+ /// use datafusion_common::Result;
+ /// use datafusion_physical_expr::expressions::Column;
+ /// use datafusion_physical_expr::projection::ProjectionExprs;
+ /// use datafusion_physical_expr::PhysicalExpr;
+ ///
+ /// // Create a schema and projection
+ /// let schema = Arc::new(Schema::new(vec![
+ /// Field::new("a", DataType::Int32, false),
+ /// Field::new("b", DataType::Int32, false),
+ /// ]));
+ /// let projection = ProjectionExprs::from_indices(&[0, 1], &schema);
+ ///
+ /// // Transform each expression (this example just clones them)
+ /// let transformed = projection.try_map_exprs(|expr| Ok(expr))?;
+ /// assert_eq!(transformed.as_ref().len(), 2);
+ /// # Ok::<(), datafusion_common::DataFusionError>(())
+ /// ```
+ pub fn try_map_exprs<F>(self, mut f: F) -> Result<Self>
+ where
+ F: FnMut(Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>>,
+ {
+ let exprs = self
+ .exprs
+ .into_iter()
+ .map(|mut proj| {
+ proj.expr = f(proj.expr)?;
+ Ok(proj)
+ })
+ .collect::<Result<Vec<_>>>()?;
+ Ok(Self::new(exprs))
+ }
+
/// Apply another projection on top of this projection, returning the
combined projection.
/// For example, if this projection is `SELECT c@2 AS x, b@1 AS y, a@0 as
z` and the other projection is `SELECT x@0 + 1 AS c1, y@1 + z@2 as c2`,
/// we return a projection equivalent to `SELECT c@2 + 1 AS c1, b@1 + a@0
as c2`.
diff --git a/datafusion/pruning/src/pruning_predicate.rs
b/datafusion/pruning/src/pruning_predicate.rs
index b9bbaea45a..4110391514 100644
--- a/datafusion/pruning/src/pruning_predicate.rs
+++ b/datafusion/pruning/src/pruning_predicate.rs
@@ -43,6 +43,7 @@ use datafusion_common::{
ScalarValue,
};
use datafusion_expr_common::operator::Operator;
+use datafusion_physical_expr::expressions::CastColumnExpr;
use datafusion_physical_expr::utils::{collect_columns, Guarantee,
LiteralGuarantee};
use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef};
use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr;
@@ -1105,6 +1106,20 @@ fn rewrite_expr_to_prunable(
None,
));
Ok((left, op, right))
+ } else if let Some(cast_col) =
column_expr_any.downcast_ref::<CastColumnExpr>() {
+ // `cast_column(col) op lit()` - same as CastExpr but uses
CastColumnExpr
+ let arrow_schema = schema.as_arrow();
+ let from_type = cast_col.expr().data_type(arrow_schema)?;
+ let to_type = cast_col.target_field().data_type();
+ verify_support_type_for_prune(&from_type, to_type)?;
+ let (left, op, right) =
+ rewrite_expr_to_prunable(cast_col.expr(), op, scalar_expr,
schema)?;
+ // Predicate pruning / statistics generally don't support struct
columns yet.
+ // In the future we may want to support pruning on nested fields, in
which case we probably need to
+ // do something more sophisticated here.
+ // But for now since we don't support pruning on nested fields, we can
just cast to the target type directly.
+ let left = Arc::new(phys_expr::CastExpr::new(left, to_type.clone(),
None));
+ Ok((left, op, right))
} else if let Some(try_cast) =
column_expr_any.downcast_ref::<phys_expr::TryCastExpr>()
{
diff --git a/datafusion/sqllogictest/test_files/timestamps.slt
b/datafusion/sqllogictest/test_files/timestamps.slt
index 3a1a26257b..3bf11c92e2 100644
--- a/datafusion/sqllogictest/test_files/timestamps.slt
+++ b/datafusion/sqllogictest/test_files/timestamps.slt
@@ -3702,6 +3702,76 @@ FROM ts_data_micros_kolkata
2020-09-08T18:12:29.190+05:30
2020-09-08T17:12:29.190+05:30
+
+##########
+## Casting between timestamp with and without timezone
+##########
+
+# Test casting from Timestamp(Nanosecond, Some("UTC")) to
Timestamp(Nanosecond, None)
+# Verifies that the underlying nanosecond values are preserved when removing
timezone
+
+# Verify input type
+query T
+SELECT arrow_typeof(arrow_cast(1, 'Timestamp(Nanosecond, Some("UTC"))'));
+----
+Timestamp(ns, "UTC")
+
+# Verify output type after casting
+query T
+SELECT arrow_typeof(arrow_cast(arrow_cast(1, 'Timestamp(Nanosecond,
Some("UTC"))'), 'Timestamp(Nanosecond, None)'));
+----
+Timestamp(ns)
+
+# Verify values are preserved when casting from timestamp with timezone to
timestamp without timezone
+query P rowsort
+SELECT arrow_cast(column1, 'Timestamp(Nanosecond, None)')
+FROM (VALUES
+ (arrow_cast(1, 'Timestamp(Nanosecond, Some("UTC"))')),
+ (arrow_cast(2, 'Timestamp(Nanosecond, Some("UTC"))')),
+ (arrow_cast(3, 'Timestamp(Nanosecond, Some("UTC"))')),
+ (arrow_cast(4, 'Timestamp(Nanosecond, Some("UTC"))')),
+ (arrow_cast(5, 'Timestamp(Nanosecond, Some("UTC"))'))
+) t;
+----
+1970-01-01T00:00:00.000000001
+1970-01-01T00:00:00.000000002
+1970-01-01T00:00:00.000000003
+1970-01-01T00:00:00.000000004
+1970-01-01T00:00:00.000000005
+
+# Test casting from Timestamp(Nanosecond, None) to Timestamp(Nanosecond,
Some("UTC"))
+# Verifies that the underlying nanosecond values are preserved when adding
timezone
+
+# Verify input type
+query T
+SELECT arrow_typeof(arrow_cast(1, 'Timestamp(Nanosecond, None)'));
+----
+Timestamp(ns)
+
+# Verify output type after casting
+query T
+SELECT arrow_typeof(arrow_cast(arrow_cast(1, 'Timestamp(Nanosecond, None)'),
'Timestamp(Nanosecond, Some("UTC"))'));
+----
+Timestamp(ns, "UTC")
+
+# Verify values are preserved when casting from timestamp without timezone to
timestamp with timezone
+query P rowsort
+SELECT arrow_cast(column1, 'Timestamp(Nanosecond, Some("UTC"))')
+FROM (VALUES
+ (arrow_cast(1, 'Timestamp(Nanosecond, None)')),
+ (arrow_cast(2, 'Timestamp(Nanosecond, None)')),
+ (arrow_cast(3, 'Timestamp(Nanosecond, None)')),
+ (arrow_cast(4, 'Timestamp(Nanosecond, None)')),
+ (arrow_cast(5, 'Timestamp(Nanosecond, None)'))
+) t;
+----
+1970-01-01T00:00:00.000000001Z
+1970-01-01T00:00:00.000000002Z
+1970-01-01T00:00:00.000000003Z
+1970-01-01T00:00:00.000000004Z
+1970-01-01T00:00:00.000000005Z
+
+
##########
## Common timestamp data
##########
diff --git a/docs/source/library-user-guide/upgrading.md
b/docs/source/library-user-guide/upgrading.md
index 00e55ac9e7..caab9ad4e7 100644
--- a/docs/source/library-user-guide/upgrading.md
+++ b/docs/source/library-user-guide/upgrading.md
@@ -369,7 +369,16 @@ let config = FileScanConfigBuilder::new(url, source)
.build();
```
-**Handling projections in `FileSource`:**
+### `SchemaAdapterFactory` Fully Removed from Parquet
+
+Following the deprecation announced in [DataFusion
49.0.0](#deprecating-schemaadapterfactory-and-schemaadapter),
`SchemaAdapterFactory` has been fully removed from Parquet scanning. This
applies to both:
+
+- **Predicate pushdown / row filtering** (deprecated in 49.0.0)
+- **Projections** (newly removed in 52.0.0)
+
+If you were using a custom `SchemaAdapterFactory` for schema adaptation (e.g.,
default column values, type coercion), you should now implement
`PhysicalExprAdapterFactory` instead.
+
+See the [default column values
example](https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/custom_data_source/default_column_values.rs)
for how to implement a custom `PhysicalExprAdapterFactory`.
### `PhysicalOptimizerRule::optimize` deprecated in favor of `optimize_plan`
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]