This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 4e55768deb feat: Expose Parquet Schema Adapter (#10515)
4e55768deb is described below
commit 4e55768deb926cef021a001cb953ae2fe313615e
Author: Michael Maletich <[email protected]>
AuthorDate: Fri May 17 15:17:38 2024 -0500
feat: Expose Parquet Schema Adapter (#10515)
* feat: Expose Parquet Schema Adapter
---
.../core/src/datasource/file_format/parquet.rs | 6 +-
.../core/src/datasource/physical_plan/mod.rs | 78 +++++-----
.../src/datasource/physical_plan/parquet/mod.rs | 31 +++-
.../physical_plan/parquet/schema_adapter.rs | 69 +++++++++
datafusion/core/tests/parquet/mod.rs | 1 +
datafusion/core/tests/parquet/schema_adapter.rs | 171 +++++++++++++++++++++
6 files changed, 312 insertions(+), 44 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index 8182ced6f2..7fcd41049c 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -31,7 +31,8 @@ use crate::arrow::array::{
use crate::arrow::datatypes::{DataType, Fields, Schema, SchemaRef};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::physical_plan::{
- FileGroupDisplay, FileSinkConfig, ParquetExec, SchemaAdapter,
+ DefaultSchemaAdapterFactory, FileGroupDisplay, FileSinkConfig, ParquetExec,
+ SchemaAdapterFactory,
};
use crate::datasource::statistics::{create_max_min_accs, get_col_stats};
use crate::error::Result;
@@ -470,7 +471,8 @@ async fn fetch_statistics(
let mut null_counts = vec![Precision::Exact(0); num_fields];
let mut has_statistics = false;
- let schema_adapter = SchemaAdapter::new(table_schema.clone());
+ let schema_adapter =
+ DefaultSchemaAdapterFactory::default().create(table_schema.clone());
let (mut max_values, mut min_values) = create_max_min_accs(&table_schema);
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs
b/datafusion/core/src/datasource/physical_plan/mod.rs
index c450774572..6e19961f60 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -31,7 +31,19 @@ mod statistics;
pub(crate) use self::csv::plan_to_csv;
pub(crate) use self::json::plan_to_json;
#[cfg(feature = "parquet")]
-pub use self::parquet::{ParquetExec, ParquetFileMetrics,
ParquetFileReaderFactory};
+pub use self::parquet::{
+ ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory, SchemaAdapter,
+ SchemaAdapterFactory, SchemaMapper,
+};
+#[cfg(feature = "parquet")]
+use arrow::{
+ array::new_null_array,
+ compute::{can_cast_types, cast},
+ datatypes::Schema,
+ record_batch::{RecordBatch, RecordBatchOptions},
+};
+#[cfg(feature = "parquet")]
+use datafusion_common::plan_err;
pub use arrow_file::ArrowExec;
pub use avro::AvroExec;
@@ -61,13 +73,7 @@ use crate::{
physical_plan::display::{display_orderings, ProjectSchemaDisplay},
};
-use arrow::{
- array::new_null_array,
- compute::{can_cast_types, cast},
- datatypes::{DataType, Schema, SchemaRef},
- record_batch::{RecordBatch, RecordBatchOptions},
-};
-use datafusion_common::plan_err;
+use arrow::datatypes::{DataType, SchemaRef};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::PhysicalSortExpr;
@@ -241,39 +247,31 @@ where
Ok(())
}
-/// A utility which can adapt file-level record batches to a table schema
which may have a schema
-/// obtained from merging multiple file-level schemas.
-///
-/// This is useful for enabling schema evolution in partitioned datasets.
-///
-/// This has to be done in two stages.
-///
-/// 1. Before reading the file, we have to map projected column indexes from
the table schema to
-/// the file schema.
-///
-/// 2. After reading a record batch we need to map the read columns back to
the expected columns
-/// indexes and insert null-valued columns wherever the file schema was
missing a colum present
-/// in the table schema.
+#[cfg(feature = "parquet")]
+#[derive(Clone, Debug, Default)]
+pub(crate) struct DefaultSchemaAdapterFactory {}
+
+#[cfg(feature = "parquet")]
+impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
+ fn create(&self, table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
+ Box::new(DefaultSchemaAdapter { table_schema })
+ }
+}
+
+#[cfg(feature = "parquet")]
#[derive(Clone, Debug)]
-pub(crate) struct SchemaAdapter {
+pub(crate) struct DefaultSchemaAdapter {
/// Schema for the table
table_schema: SchemaRef,
}
-impl SchemaAdapter {
- pub(crate) fn new(table_schema: SchemaRef) -> SchemaAdapter {
- Self { table_schema }
- }
-
+#[cfg(feature = "parquet")]
+impl SchemaAdapter for DefaultSchemaAdapter {
/// Map a column index in the table schema to a column index in a
particular
/// file schema
///
/// Panics if index is not in range for the table schema
- pub(crate) fn map_column_index(
- &self,
- index: usize,
- file_schema: &Schema,
- ) -> Option<usize> {
+ fn map_column_index(&self, index: usize, file_schema: &Schema) ->
Option<usize> {
let field = self.table_schema.field(index);
Some(file_schema.fields.find(field.name())?.0)
}
@@ -286,10 +284,10 @@ impl SchemaAdapter {
///
/// Returns a [`SchemaMapping`] that can be applied to the output batch
/// along with an ordered list of columns to project from the file
- pub fn map_schema(
+ fn map_schema(
&self,
file_schema: &Schema,
- ) -> Result<(SchemaMapping, Vec<usize>)> {
+ ) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
let mut projection = Vec::with_capacity(file_schema.fields().len());
let mut field_mappings = vec![None; self.table_schema.fields().len()];
@@ -315,10 +313,10 @@ impl SchemaAdapter {
}
Ok((
- SchemaMapping {
+ Arc::new(SchemaMapping {
table_schema: self.table_schema.clone(),
field_mappings,
- },
+ }),
projection,
))
}
@@ -326,6 +324,7 @@ impl SchemaAdapter {
/// The SchemaMapping struct holds a mapping from the file schema to the table
schema
/// and any necessary type conversions that need to be applied.
+#[cfg(feature = "parquet")]
#[derive(Debug)]
pub struct SchemaMapping {
/// The schema of the table. This is the expected schema after conversion
and it should match the schema of the query result.
@@ -334,7 +333,8 @@ pub struct SchemaMapping {
field_mappings: Vec<Option<usize>>,
}
-impl SchemaMapping {
+#[cfg(feature = "parquet")]
+impl SchemaMapper for SchemaMapping {
/// Adapts a `RecordBatch` to match the `table_schema` using the stored
mapping and conversions.
fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
let batch_rows = batch.num_rows();
@@ -636,7 +636,7 @@ mod tests {
Field::new("c3", DataType::Float64, true),
]));
- let adapter = SchemaAdapter::new(table_schema.clone());
+ let adapter =
DefaultSchemaAdapterFactory::default().create(table_schema.clone());
let file_schema = Schema::new(vec![
Field::new("c1", DataType::Utf8, true),
@@ -693,7 +693,7 @@ mod tests {
let indices = vec![1, 2, 4];
let schema = SchemaRef::from(table_schema.project(&indices).unwrap());
- let adapter = SchemaAdapter::new(schema);
+ let adapter = DefaultSchemaAdapterFactory::default().create(schema);
let (mapping, projection) = adapter.map_schema(&file_schema).unwrap();
let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index a48f510adb..7509d08ad8 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -27,8 +27,8 @@ use crate::datasource::physical_plan::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
use crate::datasource::physical_plan::{
- parquet::page_filter::PagePruningPredicate, DisplayAs,
FileGroupPartitioner,
- FileMeta, FileScanConfig, SchemaAdapter,
+ parquet::page_filter::PagePruningPredicate, DefaultSchemaAdapterFactory,
DisplayAs,
+ FileGroupPartitioner, FileMeta, FileScanConfig,
};
use crate::{
config::{ConfigOptions, TableParquetOptions},
@@ -67,9 +67,11 @@ mod metrics;
mod page_filter;
mod row_filter;
mod row_groups;
+mod schema_adapter;
mod statistics;
pub use metrics::ParquetFileMetrics;
+pub use schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper};
/// Execution plan for scanning one or more Parquet partitions
#[derive(Debug, Clone)]
@@ -93,6 +95,8 @@ pub struct ParquetExec {
cache: PlanProperties,
/// Options for reading Parquet files
table_parquet_options: TableParquetOptions,
+ /// Optional user defined schema adapter
+ schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
}
impl ParquetExec {
@@ -157,6 +161,7 @@ impl ParquetExec {
parquet_file_reader_factory: None,
cache,
table_parquet_options,
+ schema_adapter_factory: None,
}
}
@@ -195,6 +200,19 @@ impl ParquetExec {
self
}
+ /// Optional schema adapter factory.
+ ///
+ /// `SchemaAdapterFactory` allows user to specify how fields from the
parquet file get mapped to
+ /// that of the table schema. The default schema adapter uses arrow's
cast library to map
+ /// the parquet fields to the table schema.
+ pub fn with_schema_adapter_factory(
+ mut self,
+ schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
+ ) -> Self {
+ self.schema_adapter_factory = Some(schema_adapter_factory);
+ self
+ }
+
/// If true, any filter [`Expr`]s on the scan will converted to a
/// [`RowFilter`](parquet::arrow::arrow_reader::RowFilter) in the
/// `ParquetRecordBatchStream`. These filters are applied by the
@@ -402,6 +420,11 @@ impl ExecutionPlan for ParquetExec {
})
})?;
+ let schema_adapter_factory = self
+ .schema_adapter_factory
+ .clone()
+ .unwrap_or_else(||
Arc::new(DefaultSchemaAdapterFactory::default()));
+
let opener = ParquetOpener {
partition_index,
projection: Arc::from(projection),
@@ -418,6 +441,7 @@ impl ExecutionPlan for ParquetExec {
reorder_filters: self.reorder_filters(),
enable_page_index: self.enable_page_index(),
enable_bloom_filter: self.bloom_filter_on_read(),
+ schema_adapter_factory,
};
let stream =
@@ -452,6 +476,7 @@ struct ParquetOpener {
reorder_filters: bool,
enable_page_index: bool,
enable_bloom_filter: bool,
+ schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
}
impl FileOpener for ParquetOpener {
@@ -475,7 +500,7 @@ impl FileOpener for ParquetOpener {
let batch_size = self.batch_size;
let projection = self.projection.clone();
let projected_schema =
SchemaRef::from(self.table_schema.project(&projection)?);
- let schema_adapter = SchemaAdapter::new(projected_schema);
+ let schema_adapter =
self.schema_adapter_factory.create(projected_schema);
let predicate = self.predicate.clone();
let pruning_predicate = self.pruning_predicate.clone();
let page_pruning_predicate = self.page_pruning_predicate.clone();
diff --git
a/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs
b/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs
new file mode 100644
index 0000000000..193e5161a3
--- /dev/null
+++ b/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_array::RecordBatch;
+use arrow_schema::{Schema, SchemaRef};
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// Factory of schema adapters.
+///
+/// Provides means to implement custom schema adaptation.
+pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static {
+ /// Provides `SchemaAdapter` for the ParquetExec.
+ fn create(&self, schema: SchemaRef) -> Box<dyn SchemaAdapter>;
+}
+
+/// A utility which can adapt file-level record batches to a table schema
which may have a schema
+/// obtained from merging multiple file-level schemas.
+///
+/// This is useful for enabling schema evolution in partitioned datasets.
+///
+/// This has to be done in two stages.
+///
+/// 1. Before reading the file, we have to map projected column indexes from
the table schema to
+/// the file schema.
+///
+/// 2. After reading a record batch we need to map the read columns back to
the expected columns
+/// indexes and insert null-valued columns wherever the file schema was
missing a colum present
+/// in the table schema.
+pub trait SchemaAdapter: Send + Sync {
+ /// Map a column index in the table schema to a column index in a
particular
+ /// file schema
+ ///
+ /// Panics if index is not in range for the table schema
+ fn map_column_index(&self, index: usize, file_schema: &Schema) ->
Option<usize>;
+
+ /// Creates a `SchemaMapping` that can be used to cast or map the columns
from the file schema to the table schema.
+ ///
+ /// If the provided `file_schema` contains columns of a different type to
the expected
+ /// `table_schema`, the method will attempt to cast the array data from
the file schema
+ /// to the table schema where possible.
+ ///
+ /// Returns a [`SchemaMapper`] that can be applied to the output batch
+ /// along with an ordered list of columns to project from the file
+ fn map_schema(
+ &self,
+ file_schema: &Schema,
+ ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>;
+}
+
+/// Transforms a RecordBatch from Parquet to a RecordBatch that meets the
table schema.
+pub trait SchemaMapper: Send + Sync {
+ /// Adapts a `RecordBatch` to match the `table_schema` using the stored
mapping and conversions.
+ fn map_batch(&self, batch: RecordBatch) ->
datafusion_common::Result<RecordBatch>;
+}
diff --git a/datafusion/core/tests/parquet/mod.rs
b/datafusion/core/tests/parquet/mod.rs
index bb938e3af4..fe839bf1bc 100644
--- a/datafusion/core/tests/parquet/mod.rs
+++ b/datafusion/core/tests/parquet/mod.rs
@@ -48,6 +48,7 @@ mod filter_pushdown;
mod page_pruning;
mod row_group_pruning;
mod schema;
+mod schema_adapter;
mod schema_coercion;
#[cfg(test)]
diff --git a/datafusion/core/tests/parquet/schema_adapter.rs
b/datafusion/core/tests/parquet/schema_adapter.rs
new file mode 100644
index 0000000000..10c4e8a4c0
--- /dev/null
+++ b/datafusion/core/tests/parquet/schema_adapter.rs
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fs;
+use std::sync::Arc;
+
+use arrow::datatypes::{Field, Schema};
+use arrow::record_batch::RecordBatch;
+use arrow_array::{Int32Array, StringArray};
+use arrow_schema::{DataType, SchemaRef};
+use datafusion::assert_batches_sorted_eq;
+use object_store::path::Path;
+use object_store::ObjectMeta;
+
+use datafusion::datasource::object_store::ObjectStoreUrl;
+use datafusion::datasource::physical_plan::{
+ FileScanConfig, ParquetExec, SchemaAdapter, SchemaAdapterFactory,
SchemaMapper,
+};
+use datafusion::physical_plan::{collect, Statistics};
+use datafusion::prelude::SessionContext;
+
+use datafusion::datasource::listing::PartitionedFile;
+use parquet::arrow::ArrowWriter;
+use tempfile::TempDir;
+
+#[tokio::test]
+async fn can_override_schema_adapter() {
+ // Create several parquet files in same directoty / table with
+ // same schema but different metadata
+ let tmp_dir = TempDir::new().unwrap();
+ let table_dir = tmp_dir.path().join("parquet_test");
+ fs::DirBuilder::new().create(table_dir.as_path()).unwrap();
+ let f1 = Field::new("id", DataType::Int32, true);
+
+ let file_schema = Arc::new(Schema::new(vec![f1.clone()]));
+ let filename = "part.parquet".to_string();
+ let path = table_dir.as_path().join(filename.clone());
+ let file = fs::File::create(path.clone()).unwrap();
+ let mut writer = ArrowWriter::try_new(file, file_schema.clone(),
None).unwrap();
+
+ let ids = Arc::new(Int32Array::from(vec![1i32]));
+ let rec_batch = RecordBatch::try_new(file_schema.clone(),
vec![ids]).unwrap();
+
+ writer.write(&rec_batch).unwrap();
+ writer.close().unwrap();
+
+ let location = Path::parse(path.to_str().unwrap()).unwrap();
+ let metadata = std::fs::metadata(path.as_path()).expect("Local file
metadata");
+ let meta = ObjectMeta {
+ location,
+ last_modified:
metadata.modified().map(chrono::DateTime::from).unwrap(),
+ size: metadata.len() as usize,
+ e_tag: None,
+ version: None,
+ };
+
+ let partitioned_file = PartitionedFile {
+ object_meta: meta,
+ partition_values: vec![],
+ range: None,
+ statistics: None,
+ extensions: None,
+ };
+
+ let f1 = Field::new("id", DataType::Int32, true);
+ let f2 = Field::new("extra_column", DataType::Utf8, true);
+
+ let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));
+
+ // prepare the scan
+ let parquet_exec = ParquetExec::new(
+ FileScanConfig {
+ object_store_url: ObjectStoreUrl::local_filesystem(),
+ file_groups: vec![vec![partitioned_file]],
+ statistics: Statistics::new_unknown(&schema),
+ file_schema: schema,
+ projection: None,
+ limit: None,
+ table_partition_cols: vec![],
+ output_ordering: vec![],
+ },
+ None,
+ None,
+ Default::default(),
+ )
+ .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {}));
+
+ let session_ctx = SessionContext::new();
+ let task_ctx = session_ctx.task_ctx();
+ let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap();
+
+ let expected = [
+ "+----+--------------+",
+ "| id | extra_column |",
+ "+----+--------------+",
+ "| 1 | foo |",
+ "+----+--------------+",
+ ];
+
+ assert_batches_sorted_eq!(expected, &read);
+}
+
+#[derive(Debug)]
+struct TestSchemaAdapterFactory {}
+
+impl SchemaAdapterFactory for TestSchemaAdapterFactory {
+ fn create(&self, schema: SchemaRef) -> Box<dyn SchemaAdapter> {
+ Box::new(TestSchemaAdapter {
+ table_schema: schema,
+ })
+ }
+}
+
+struct TestSchemaAdapter {
+ /// Schema for the table
+ table_schema: SchemaRef,
+}
+
+impl SchemaAdapter for TestSchemaAdapter {
+ fn map_column_index(&self, index: usize, file_schema: &Schema) ->
Option<usize> {
+ let field = self.table_schema.field(index);
+ Some(file_schema.fields.find(field.name())?.0)
+ }
+
+ fn map_schema(
+ &self,
+ file_schema: &Schema,
+ ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
+ let mut projection = Vec::with_capacity(file_schema.fields().len());
+
+ for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
+ if self.table_schema.fields().find(file_field.name()).is_some() {
+ projection.push(file_idx);
+ }
+ }
+
+ Ok((Arc::new(TestSchemaMapping {}), projection))
+ }
+}
+
+#[derive(Debug)]
+struct TestSchemaMapping {}
+
+impl SchemaMapper for TestSchemaMapping {
+ fn map_batch(&self, batch: RecordBatch) ->
datafusion_common::Result<RecordBatch> {
+ let f1 = Field::new("id", DataType::Int32, true);
+ let f2 = Field::new("extra_column", DataType::Utf8, true);
+
+ let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));
+
+ let extra_column = Arc::new(StringArray::from(vec!["foo"]));
+ let mut new_columns = batch.columns().to_vec();
+ new_columns.push(extra_column);
+
+ Ok(RecordBatch::try_new(schema, new_columns).unwrap())
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]