This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 4e55768deb feat: Expose Parquet Schema Adapter (#10515)
4e55768deb is described below

commit 4e55768deb926cef021a001cb953ae2fe313615e
Author: Michael Maletich <[email protected]>
AuthorDate: Fri May 17 15:17:38 2024 -0500

    feat: Expose Parquet Schema Adapter (#10515)
    
    * feat: Expose Parquet Schema Adapter
---
 .../core/src/datasource/file_format/parquet.rs     |   6 +-
 .../core/src/datasource/physical_plan/mod.rs       |  78 +++++-----
 .../src/datasource/physical_plan/parquet/mod.rs    |  31 +++-
 .../physical_plan/parquet/schema_adapter.rs        |  69 +++++++++
 datafusion/core/tests/parquet/mod.rs               |   1 +
 datafusion/core/tests/parquet/schema_adapter.rs    | 171 +++++++++++++++++++++
 6 files changed, 312 insertions(+), 44 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index 8182ced6f2..7fcd41049c 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -31,7 +31,8 @@ use crate::arrow::array::{
 use crate::arrow::datatypes::{DataType, Fields, Schema, SchemaRef};
 use crate::datasource::file_format::file_compression_type::FileCompressionType;
 use crate::datasource::physical_plan::{
-    FileGroupDisplay, FileSinkConfig, ParquetExec, SchemaAdapter,
+    DefaultSchemaAdapterFactory, FileGroupDisplay, FileSinkConfig, ParquetExec,
+    SchemaAdapterFactory,
 };
 use crate::datasource::statistics::{create_max_min_accs, get_col_stats};
 use crate::error::Result;
@@ -470,7 +471,8 @@ async fn fetch_statistics(
     let mut null_counts = vec![Precision::Exact(0); num_fields];
     let mut has_statistics = false;
 
-    let schema_adapter = SchemaAdapter::new(table_schema.clone());
+    let schema_adapter =
+        DefaultSchemaAdapterFactory::default().create(table_schema.clone());
 
     let (mut max_values, mut min_values) = create_max_min_accs(&table_schema);
 
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs 
b/datafusion/core/src/datasource/physical_plan/mod.rs
index c450774572..6e19961f60 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -31,7 +31,19 @@ mod statistics;
 pub(crate) use self::csv::plan_to_csv;
 pub(crate) use self::json::plan_to_json;
 #[cfg(feature = "parquet")]
-pub use self::parquet::{ParquetExec, ParquetFileMetrics, 
ParquetFileReaderFactory};
+pub use self::parquet::{
+    ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory, SchemaAdapter,
+    SchemaAdapterFactory, SchemaMapper,
+};
+#[cfg(feature = "parquet")]
+use arrow::{
+    array::new_null_array,
+    compute::{can_cast_types, cast},
+    datatypes::Schema,
+    record_batch::{RecordBatch, RecordBatchOptions},
+};
+#[cfg(feature = "parquet")]
+use datafusion_common::plan_err;
 
 pub use arrow_file::ArrowExec;
 pub use avro::AvroExec;
@@ -61,13 +73,7 @@ use crate::{
     physical_plan::display::{display_orderings, ProjectSchemaDisplay},
 };
 
-use arrow::{
-    array::new_null_array,
-    compute::{can_cast_types, cast},
-    datatypes::{DataType, Schema, SchemaRef},
-    record_batch::{RecordBatch, RecordBatchOptions},
-};
-use datafusion_common::plan_err;
+use arrow::datatypes::{DataType, SchemaRef};
 use datafusion_physical_expr::expressions::Column;
 use datafusion_physical_expr::PhysicalSortExpr;
 
@@ -241,39 +247,31 @@ where
     Ok(())
 }
 
-/// A utility which can adapt file-level record batches to a table schema 
which may have a schema
-/// obtained from merging multiple file-level schemas.
-///
-/// This is useful for enabling schema evolution in partitioned datasets.
-///
-/// This has to be done in two stages.
-///
-/// 1. Before reading the file, we have to map projected column indexes from 
the table schema to
-///    the file schema.
-///
-/// 2. After reading a record batch we need to map the read columns back to 
the expected columns
-///    indexes and insert null-valued columns wherever the file schema was 
missing a colum present
-///    in the table schema.
+#[cfg(feature = "parquet")]
+#[derive(Clone, Debug, Default)]
+pub(crate) struct DefaultSchemaAdapterFactory {}
+
+#[cfg(feature = "parquet")]
+impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
+    fn create(&self, table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
+        Box::new(DefaultSchemaAdapter { table_schema })
+    }
+}
+
+#[cfg(feature = "parquet")]
 #[derive(Clone, Debug)]
-pub(crate) struct SchemaAdapter {
+pub(crate) struct DefaultSchemaAdapter {
     /// Schema for the table
     table_schema: SchemaRef,
 }
 
-impl SchemaAdapter {
-    pub(crate) fn new(table_schema: SchemaRef) -> SchemaAdapter {
-        Self { table_schema }
-    }
-
+#[cfg(feature = "parquet")]
+impl SchemaAdapter for DefaultSchemaAdapter {
     /// Map a column index in the table schema to a column index in a 
particular
     /// file schema
     ///
     /// Panics if index is not in range for the table schema
-    pub(crate) fn map_column_index(
-        &self,
-        index: usize,
-        file_schema: &Schema,
-    ) -> Option<usize> {
+    fn map_column_index(&self, index: usize, file_schema: &Schema) -> 
Option<usize> {
         let field = self.table_schema.field(index);
         Some(file_schema.fields.find(field.name())?.0)
     }
@@ -286,10 +284,10 @@ impl SchemaAdapter {
     ///
     /// Returns a [`SchemaMapping`] that can be applied to the output batch
     /// along with an ordered list of columns to project from the file
-    pub fn map_schema(
+    fn map_schema(
         &self,
         file_schema: &Schema,
-    ) -> Result<(SchemaMapping, Vec<usize>)> {
+    ) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
         let mut projection = Vec::with_capacity(file_schema.fields().len());
         let mut field_mappings = vec![None; self.table_schema.fields().len()];
 
@@ -315,10 +313,10 @@ impl SchemaAdapter {
         }
 
         Ok((
-            SchemaMapping {
+            Arc::new(SchemaMapping {
                 table_schema: self.table_schema.clone(),
                 field_mappings,
-            },
+            }),
             projection,
         ))
     }
@@ -326,6 +324,7 @@ impl SchemaAdapter {
 
 /// The SchemaMapping struct holds a mapping from the file schema to the table 
schema
 /// and any necessary type conversions that need to be applied.
+#[cfg(feature = "parquet")]
 #[derive(Debug)]
 pub struct SchemaMapping {
     /// The schema of the table. This is the expected schema after conversion 
and it should match the schema of the query result.
@@ -334,7 +333,8 @@ pub struct SchemaMapping {
     field_mappings: Vec<Option<usize>>,
 }
 
-impl SchemaMapping {
+#[cfg(feature = "parquet")]
+impl SchemaMapper for SchemaMapping {
     /// Adapts a `RecordBatch` to match the `table_schema` using the stored 
mapping and conversions.
     fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
         let batch_rows = batch.num_rows();
@@ -636,7 +636,7 @@ mod tests {
             Field::new("c3", DataType::Float64, true),
         ]));
 
-        let adapter = SchemaAdapter::new(table_schema.clone());
+        let adapter = 
DefaultSchemaAdapterFactory::default().create(table_schema.clone());
 
         let file_schema = Schema::new(vec![
             Field::new("c1", DataType::Utf8, true),
@@ -693,7 +693,7 @@ mod tests {
 
         let indices = vec![1, 2, 4];
         let schema = SchemaRef::from(table_schema.project(&indices).unwrap());
-        let adapter = SchemaAdapter::new(schema);
+        let adapter = DefaultSchemaAdapterFactory::default().create(schema);
         let (mapping, projection) = adapter.map_schema(&file_schema).unwrap();
 
         let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index a48f510adb..7509d08ad8 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -27,8 +27,8 @@ use crate::datasource::physical_plan::file_stream::{
     FileOpenFuture, FileOpener, FileStream,
 };
 use crate::datasource::physical_plan::{
-    parquet::page_filter::PagePruningPredicate, DisplayAs, 
FileGroupPartitioner,
-    FileMeta, FileScanConfig, SchemaAdapter,
+    parquet::page_filter::PagePruningPredicate, DefaultSchemaAdapterFactory, 
DisplayAs,
+    FileGroupPartitioner, FileMeta, FileScanConfig,
 };
 use crate::{
     config::{ConfigOptions, TableParquetOptions},
@@ -67,9 +67,11 @@ mod metrics;
 mod page_filter;
 mod row_filter;
 mod row_groups;
+mod schema_adapter;
 mod statistics;
 
 pub use metrics::ParquetFileMetrics;
+pub use schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper};
 
 /// Execution plan for scanning one or more Parquet partitions
 #[derive(Debug, Clone)]
@@ -93,6 +95,8 @@ pub struct ParquetExec {
     cache: PlanProperties,
     /// Options for reading Parquet files
     table_parquet_options: TableParquetOptions,
+    /// Optional user defined schema adapter
+    schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
 }
 
 impl ParquetExec {
@@ -157,6 +161,7 @@ impl ParquetExec {
             parquet_file_reader_factory: None,
             cache,
             table_parquet_options,
+            schema_adapter_factory: None,
         }
     }
 
@@ -195,6 +200,19 @@ impl ParquetExec {
         self
     }
 
+    /// Optional schema adapter factory.
+    ///
+    /// `SchemaAdapterFactory` allows user to specify how fields from the 
parquet file get mapped to
+    ///  that of the table schema.  The default schema adapter uses arrow's 
cast library to map
+    ///  the parquet fields to the table schema.
+    pub fn with_schema_adapter_factory(
+        mut self,
+        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
+    ) -> Self {
+        self.schema_adapter_factory = Some(schema_adapter_factory);
+        self
+    }
+
     /// If true, any filter [`Expr`]s on the scan will converted to a
     /// [`RowFilter`](parquet::arrow::arrow_reader::RowFilter) in the
     /// `ParquetRecordBatchStream`. These filters are applied by the
@@ -402,6 +420,11 @@ impl ExecutionPlan for ParquetExec {
                     })
             })?;
 
+        let schema_adapter_factory = self
+            .schema_adapter_factory
+            .clone()
+            .unwrap_or_else(|| 
Arc::new(DefaultSchemaAdapterFactory::default()));
+
         let opener = ParquetOpener {
             partition_index,
             projection: Arc::from(projection),
@@ -418,6 +441,7 @@ impl ExecutionPlan for ParquetExec {
             reorder_filters: self.reorder_filters(),
             enable_page_index: self.enable_page_index(),
             enable_bloom_filter: self.bloom_filter_on_read(),
+            schema_adapter_factory,
         };
 
         let stream =
@@ -452,6 +476,7 @@ struct ParquetOpener {
     reorder_filters: bool,
     enable_page_index: bool,
     enable_bloom_filter: bool,
+    schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
 }
 
 impl FileOpener for ParquetOpener {
@@ -475,7 +500,7 @@ impl FileOpener for ParquetOpener {
         let batch_size = self.batch_size;
         let projection = self.projection.clone();
         let projected_schema = 
SchemaRef::from(self.table_schema.project(&projection)?);
-        let schema_adapter = SchemaAdapter::new(projected_schema);
+        let schema_adapter = 
self.schema_adapter_factory.create(projected_schema);
         let predicate = self.predicate.clone();
         let pruning_predicate = self.pruning_predicate.clone();
         let page_pruning_predicate = self.page_pruning_predicate.clone();
diff --git 
a/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs
new file mode 100644
index 0000000000..193e5161a3
--- /dev/null
+++ b/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_array::RecordBatch;
+use arrow_schema::{Schema, SchemaRef};
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// Factory of schema adapters.
+///
+/// Provides means to implement custom schema adaptation.
+pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static {
+    /// Provides `SchemaAdapter` for the ParquetExec.
+    fn create(&self, schema: SchemaRef) -> Box<dyn SchemaAdapter>;
+}
+
+/// A utility which can adapt file-level record batches to a table schema 
which may have a schema
+/// obtained from merging multiple file-level schemas.
+///
+/// This is useful for enabling schema evolution in partitioned datasets.
+///
+/// This has to be done in two stages.
+///
+/// 1. Before reading the file, we have to map projected column indexes from 
the table schema to
+///    the file schema.
+///
+/// 2. After reading a record batch we need to map the read columns back to 
the expected columns
+///    indexes and insert null-valued columns wherever the file schema was 
missing a colum present
+///    in the table schema.
+pub trait SchemaAdapter: Send + Sync {
+    /// Map a column index in the table schema to a column index in a 
particular
+    /// file schema
+    ///
+    /// Panics if index is not in range for the table schema
+    fn map_column_index(&self, index: usize, file_schema: &Schema) -> 
Option<usize>;
+
+    /// Creates a `SchemaMapping` that can be used to cast or map the columns 
from the file schema to the table schema.
+    ///
+    /// If the provided `file_schema` contains columns of a different type to 
the expected
+    /// `table_schema`, the method will attempt to cast the array data from 
the file schema
+    /// to the table schema where possible.
+    ///
+    /// Returns a [`SchemaMapper`] that can be applied to the output batch
+    /// along with an ordered list of columns to project from the file
+    fn map_schema(
+        &self,
+        file_schema: &Schema,
+    ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>;
+}
+
+/// Transforms a RecordBatch from Parquet to a RecordBatch that meets the 
table schema.
+pub trait SchemaMapper: Send + Sync {
+    /// Adapts a `RecordBatch` to match the `table_schema` using the stored 
mapping and conversions.
+    fn map_batch(&self, batch: RecordBatch) -> 
datafusion_common::Result<RecordBatch>;
+}
diff --git a/datafusion/core/tests/parquet/mod.rs 
b/datafusion/core/tests/parquet/mod.rs
index bb938e3af4..fe839bf1bc 100644
--- a/datafusion/core/tests/parquet/mod.rs
+++ b/datafusion/core/tests/parquet/mod.rs
@@ -48,6 +48,7 @@ mod filter_pushdown;
 mod page_pruning;
 mod row_group_pruning;
 mod schema;
+mod schema_adapter;
 mod schema_coercion;
 
 #[cfg(test)]
diff --git a/datafusion/core/tests/parquet/schema_adapter.rs 
b/datafusion/core/tests/parquet/schema_adapter.rs
new file mode 100644
index 0000000000..10c4e8a4c0
--- /dev/null
+++ b/datafusion/core/tests/parquet/schema_adapter.rs
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fs;
+use std::sync::Arc;
+
+use arrow::datatypes::{Field, Schema};
+use arrow::record_batch::RecordBatch;
+use arrow_array::{Int32Array, StringArray};
+use arrow_schema::{DataType, SchemaRef};
+use datafusion::assert_batches_sorted_eq;
+use object_store::path::Path;
+use object_store::ObjectMeta;
+
+use datafusion::datasource::object_store::ObjectStoreUrl;
+use datafusion::datasource::physical_plan::{
+    FileScanConfig, ParquetExec, SchemaAdapter, SchemaAdapterFactory, 
SchemaMapper,
+};
+use datafusion::physical_plan::{collect, Statistics};
+use datafusion::prelude::SessionContext;
+
+use datafusion::datasource::listing::PartitionedFile;
+use parquet::arrow::ArrowWriter;
+use tempfile::TempDir;
+
+#[tokio::test]
+async fn can_override_schema_adapter() {
+    // Create several parquet files in same directoty / table with
+    // same schema but different metadata
+    let tmp_dir = TempDir::new().unwrap();
+    let table_dir = tmp_dir.path().join("parquet_test");
+    fs::DirBuilder::new().create(table_dir.as_path()).unwrap();
+    let f1 = Field::new("id", DataType::Int32, true);
+
+    let file_schema = Arc::new(Schema::new(vec![f1.clone()]));
+    let filename = "part.parquet".to_string();
+    let path = table_dir.as_path().join(filename.clone());
+    let file = fs::File::create(path.clone()).unwrap();
+    let mut writer = ArrowWriter::try_new(file, file_schema.clone(), 
None).unwrap();
+
+    let ids = Arc::new(Int32Array::from(vec![1i32]));
+    let rec_batch = RecordBatch::try_new(file_schema.clone(), 
vec![ids]).unwrap();
+
+    writer.write(&rec_batch).unwrap();
+    writer.close().unwrap();
+
+    let location = Path::parse(path.to_str().unwrap()).unwrap();
+    let metadata = std::fs::metadata(path.as_path()).expect("Local file 
metadata");
+    let meta = ObjectMeta {
+        location,
+        last_modified: 
metadata.modified().map(chrono::DateTime::from).unwrap(),
+        size: metadata.len() as usize,
+        e_tag: None,
+        version: None,
+    };
+
+    let partitioned_file = PartitionedFile {
+        object_meta: meta,
+        partition_values: vec![],
+        range: None,
+        statistics: None,
+        extensions: None,
+    };
+
+    let f1 = Field::new("id", DataType::Int32, true);
+    let f2 = Field::new("extra_column", DataType::Utf8, true);
+
+    let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));
+
+    // prepare the scan
+    let parquet_exec = ParquetExec::new(
+        FileScanConfig {
+            object_store_url: ObjectStoreUrl::local_filesystem(),
+            file_groups: vec![vec![partitioned_file]],
+            statistics: Statistics::new_unknown(&schema),
+            file_schema: schema,
+            projection: None,
+            limit: None,
+            table_partition_cols: vec![],
+            output_ordering: vec![],
+        },
+        None,
+        None,
+        Default::default(),
+    )
+    .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {}));
+
+    let session_ctx = SessionContext::new();
+    let task_ctx = session_ctx.task_ctx();
+    let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap();
+
+    let expected = [
+        "+----+--------------+",
+        "| id | extra_column |",
+        "+----+--------------+",
+        "| 1  | foo          |",
+        "+----+--------------+",
+    ];
+
+    assert_batches_sorted_eq!(expected, &read);
+}
+
+#[derive(Debug)]
+struct TestSchemaAdapterFactory {}
+
+impl SchemaAdapterFactory for TestSchemaAdapterFactory {
+    fn create(&self, schema: SchemaRef) -> Box<dyn SchemaAdapter> {
+        Box::new(TestSchemaAdapter {
+            table_schema: schema,
+        })
+    }
+}
+
+struct TestSchemaAdapter {
+    /// Schema for the table
+    table_schema: SchemaRef,
+}
+
+impl SchemaAdapter for TestSchemaAdapter {
+    fn map_column_index(&self, index: usize, file_schema: &Schema) -> 
Option<usize> {
+        let field = self.table_schema.field(index);
+        Some(file_schema.fields.find(field.name())?.0)
+    }
+
+    fn map_schema(
+        &self,
+        file_schema: &Schema,
+    ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
+        let mut projection = Vec::with_capacity(file_schema.fields().len());
+
+        for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
+            if self.table_schema.fields().find(file_field.name()).is_some() {
+                projection.push(file_idx);
+            }
+        }
+
+        Ok((Arc::new(TestSchemaMapping {}), projection))
+    }
+}
+
+#[derive(Debug)]
+struct TestSchemaMapping {}
+
+impl SchemaMapper for TestSchemaMapping {
+    fn map_batch(&self, batch: RecordBatch) -> 
datafusion_common::Result<RecordBatch> {
+        let f1 = Field::new("id", DataType::Int32, true);
+        let f2 = Field::new("extra_column", DataType::Utf8, true);
+
+        let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));
+
+        let extra_column = Arc::new(StringArray::from(vec!["foo"]));
+        let mut new_columns = batch.columns().to_vec();
+        new_columns.push(extra_column);
+
+        Ok(RecordBatch::try_new(schema, new_columns).unwrap())
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to