alamb commented on code in PR #10680: URL: https://github.com/apache/datafusion/pull/10680#discussion_r1615806596
########## datafusion/core/src/datasource/schema_adapter.rs: ########## @@ -0,0 +1,337 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Schema Adapter provides a method of translating the RecordBatches that come out of the +//! physical format into how they should be used by DataFusion. For instance, a schema +//! can be stored external to a parquet file that maps parquet logical types to arrow types. + +use arrow::compute::{can_cast_types, cast}; +use arrow_array::{new_null_array, RecordBatch, RecordBatchOptions}; +use arrow_schema::{Schema, SchemaRef}; +use datafusion_common::plan_err; +use std::fmt::Debug; +use std::sync::Arc; + +/// Factory of schema adapters. +/// +/// Provides means to implement custom schema adaptation. +pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static { + /// Provides `SchemaAdapter`. + fn create(&self, schema: SchemaRef) -> Box<dyn SchemaAdapter>; +} + +/// A utility which can adapt file-level record batches to a table schema which may have a schema +/// obtained from merging multiple file-level schemas. +/// +/// This is useful for enabling schema evolution in partitioned datasets. +/// +/// This has to be done in two stages. +/// +/// 1. Before reading the file, we have to map projected column indexes from the table schema to +/// the file schema. +/// +/// 2. After reading a record batch we need to map the read columns back to the expected columns +/// indexes and insert null-valued columns wherever the file schema was missing a colum present +/// in the table schema. +pub trait SchemaAdapter: Send + Sync { + /// Map a column index in the table schema to a column index in a particular + /// file schema + /// + /// Panics if index is not in range for the table schema + fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize>; + + /// Creates a `SchemaMapping` that can be used to cast or map the columns from the file schema to the table schema. + /// + /// If the provided `file_schema` contains columns of a different type to the expected + /// `table_schema`, the method will attempt to cast the array data from the file schema + /// to the table schema where possible. + /// + /// Returns a [`SchemaMapper`] that can be applied to the output batch + /// along with an ordered list of columns to project from the file + fn map_schema( + &self, + file_schema: &Schema, + ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>; +} + +/// Transforms a RecordBatch from the physical layer to a RecordBatch that meets the table schema. +pub trait SchemaMapper: Send + Sync { + /// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions. + fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch>; +} + +#[derive(Clone, Debug, Default)] +pub(crate) struct DefaultSchemaAdapterFactory {} + +impl SchemaAdapterFactory for DefaultSchemaAdapterFactory { + fn create(&self, table_schema: SchemaRef) -> Box<dyn SchemaAdapter> { + Box::new(DefaultSchemaAdapter { table_schema }) + } +} + +#[derive(Clone, Debug)] +pub(crate) struct DefaultSchemaAdapter { + /// Schema for the table + table_schema: SchemaRef, +} + +impl SchemaAdapter for DefaultSchemaAdapter { + /// Map a column index in the table schema to a column index in a particular + /// file schema + /// + /// Panics if index is not in range for the table schema + fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> { + let field = self.table_schema.field(index); + Some(file_schema.fields.find(field.name())?.0) + } + + /// Creates a `SchemaMapping` that can be used to cast or map the columns from the file schema to the table schema. + /// + /// If the provided `file_schema` contains columns of a different type to the expected + /// `table_schema`, the method will attempt to cast the array data from the file schema + /// to the table schema where possible. + /// + /// Returns a [`SchemaMapping`] that can be applied to the output batch + /// along with an ordered list of columns to project from the file + fn map_schema( + &self, + file_schema: &Schema, + ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> { + let mut projection = Vec::with_capacity(file_schema.fields().len()); + let mut field_mappings = vec![None; self.table_schema.fields().len()]; + + for (file_idx, file_field) in file_schema.fields.iter().enumerate() { + if let Some((table_idx, table_field)) = + self.table_schema.fields().find(file_field.name()) + { + match can_cast_types(file_field.data_type(), table_field.data_type()) { + true => { + field_mappings[table_idx] = Some(projection.len()); + projection.push(file_idx); + } + false => { + return plan_err!( + "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", + file_field.name(), + file_field.data_type(), + table_field.data_type() + ) + } + } + } + } + + Ok(( + Arc::new(SchemaMapping { + table_schema: self.table_schema.clone(), + field_mappings, + }), + projection, + )) + } +} + +/// The SchemaMapping struct holds a mapping from the file schema to the table schema +/// and any necessary type conversions that need to be applied. +#[derive(Debug)] +pub struct SchemaMapping { + /// The schema of the table. This is the expected schema after conversion and it should match the schema of the query result. + table_schema: SchemaRef, + /// Mapping from field index in `table_schema` to index in projected file_schema + field_mappings: Vec<Option<usize>>, +} + +impl SchemaMapper for SchemaMapping { + /// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions. + fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch> { + let batch_rows = batch.num_rows(); + let batch_cols = batch.columns().to_vec(); + + let cols = self + .table_schema + .fields() + .iter() + .zip(&self.field_mappings) + .map(|(field, file_idx)| match file_idx { + Some(batch_idx) => cast(&batch_cols[*batch_idx], field.data_type()), + None => Ok(new_null_array(field.data_type(), batch_rows)), + }) + .collect::<datafusion_common::Result<Vec<_>, _>>()?; + + // Necessary to handle empty batches + let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); + + let schema = self.table_schema.clone(); + let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?; + Ok(record_batch) + } +} + +#[cfg(test)] Review Comment: Moving the tests to be along side the schema mapper makes a lot of sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
