alamb commented on code in PR #10680:
URL: https://github.com/apache/datafusion/pull/10680#discussion_r1615806596


##########
datafusion/core/src/datasource/schema_adapter.rs:
##########
@@ -0,0 +1,337 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Schema Adapter provides a method of translating the RecordBatches that 
come out of the
+//! physical format into how they should be used by DataFusion.  For instance, 
a schema
+//! can be stored external to a parquet file that maps parquet logical types 
to arrow types.
+
+use arrow::compute::{can_cast_types, cast};
+use arrow_array::{new_null_array, RecordBatch, RecordBatchOptions};
+use arrow_schema::{Schema, SchemaRef};
+use datafusion_common::plan_err;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// Factory of schema adapters.
+///
+/// Provides means to implement custom schema adaptation.
+pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static {
+    /// Provides `SchemaAdapter`.
+    fn create(&self, schema: SchemaRef) -> Box<dyn SchemaAdapter>;
+}
+
+/// A utility which can adapt file-level record batches to a table schema 
which may have a schema
+/// obtained from merging multiple file-level schemas.
+///
+/// This is useful for enabling schema evolution in partitioned datasets.
+///
+/// This has to be done in two stages.
+///
+/// 1. Before reading the file, we have to map projected column indexes from 
the table schema to
+///    the file schema.
+///
+/// 2. After reading a record batch we need to map the read columns back to 
the expected columns
+///    indexes and insert null-valued columns wherever the file schema was 
missing a colum present
+///    in the table schema.
+pub trait SchemaAdapter: Send + Sync {
+    /// Map a column index in the table schema to a column index in a 
particular
+    /// file schema
+    ///
+    /// Panics if index is not in range for the table schema
+    fn map_column_index(&self, index: usize, file_schema: &Schema) -> 
Option<usize>;
+
+    /// Creates a `SchemaMapping` that can be used to cast or map the columns 
from the file schema to the table schema.
+    ///
+    /// If the provided `file_schema` contains columns of a different type to 
the expected
+    /// `table_schema`, the method will attempt to cast the array data from 
the file schema
+    /// to the table schema where possible.
+    ///
+    /// Returns a [`SchemaMapper`] that can be applied to the output batch
+    /// along with an ordered list of columns to project from the file
+    fn map_schema(
+        &self,
+        file_schema: &Schema,
+    ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>;
+}
+
+/// Transforms a RecordBatch from the physical layer to a RecordBatch that 
meets the table schema.
+pub trait SchemaMapper: Send + Sync {
+    /// Adapts a `RecordBatch` to match the `table_schema` using the stored 
mapping and conversions.
+    fn map_batch(&self, batch: RecordBatch) -> 
datafusion_common::Result<RecordBatch>;
+}
+
+#[derive(Clone, Debug, Default)]
+pub(crate) struct DefaultSchemaAdapterFactory {}
+
+impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
+    fn create(&self, table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
+        Box::new(DefaultSchemaAdapter { table_schema })
+    }
+}
+
+#[derive(Clone, Debug)]
+pub(crate) struct DefaultSchemaAdapter {
+    /// Schema for the table
+    table_schema: SchemaRef,
+}
+
+impl SchemaAdapter for DefaultSchemaAdapter {
+    /// Map a column index in the table schema to a column index in a 
particular
+    /// file schema
+    ///
+    /// Panics if index is not in range for the table schema
+    fn map_column_index(&self, index: usize, file_schema: &Schema) -> 
Option<usize> {
+        let field = self.table_schema.field(index);
+        Some(file_schema.fields.find(field.name())?.0)
+    }
+
+    /// Creates a `SchemaMapping` that can be used to cast or map the columns 
from the file schema to the table schema.
+    ///
+    /// If the provided `file_schema` contains columns of a different type to 
the expected
+    /// `table_schema`, the method will attempt to cast the array data from 
the file schema
+    /// to the table schema where possible.
+    ///
+    /// Returns a [`SchemaMapping`] that can be applied to the output batch
+    /// along with an ordered list of columns to project from the file
+    fn map_schema(
+        &self,
+        file_schema: &Schema,
+    ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
+        let mut projection = Vec::with_capacity(file_schema.fields().len());
+        let mut field_mappings = vec![None; self.table_schema.fields().len()];
+
+        for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
+            if let Some((table_idx, table_field)) =
+                self.table_schema.fields().find(file_field.name())
+            {
+                match can_cast_types(file_field.data_type(), 
table_field.data_type()) {
+                    true => {
+                        field_mappings[table_idx] = Some(projection.len());
+                        projection.push(file_idx);
+                    }
+                    false => {
+                        return plan_err!(
+                            "Cannot cast file schema field {} of type {:?} to 
table schema field of type {:?}",
+                            file_field.name(),
+                            file_field.data_type(),
+                            table_field.data_type()
+                        )
+                    }
+                }
+            }
+        }
+
+        Ok((
+            Arc::new(SchemaMapping {
+                table_schema: self.table_schema.clone(),
+                field_mappings,
+            }),
+            projection,
+        ))
+    }
+}
+
+/// The SchemaMapping struct holds a mapping from the file schema to the table 
schema
+/// and any necessary type conversions that need to be applied.
+#[derive(Debug)]
+pub struct SchemaMapping {
+    /// The schema of the table. This is the expected schema after conversion 
and it should match the schema of the query result.
+    table_schema: SchemaRef,
+    /// Mapping from field index in `table_schema` to index in projected 
file_schema
+    field_mappings: Vec<Option<usize>>,
+}
+
+impl SchemaMapper for SchemaMapping {
+    /// Adapts a `RecordBatch` to match the `table_schema` using the stored 
mapping and conversions.
+    fn map_batch(&self, batch: RecordBatch) -> 
datafusion_common::Result<RecordBatch> {
+        let batch_rows = batch.num_rows();
+        let batch_cols = batch.columns().to_vec();
+
+        let cols = self
+            .table_schema
+            .fields()
+            .iter()
+            .zip(&self.field_mappings)
+            .map(|(field, file_idx)| match file_idx {
+                Some(batch_idx) => cast(&batch_cols[*batch_idx], 
field.data_type()),
+                None => Ok(new_null_array(field.data_type(), batch_rows)),
+            })
+            .collect::<datafusion_common::Result<Vec<_>, _>>()?;
+
+        // Necessary to handle empty batches
+        let options = 
RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
+
+        let schema = self.table_schema.clone();
+        let record_batch = RecordBatch::try_new_with_options(schema, cols, 
&options)?;
+        Ok(record_batch)
+    }
+}
+
+#[cfg(test)]

Review Comment:
   Moving the tests to be along side the schema mapper makes a lot of sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to