Re: [PR] Implement schema adapter support for FileSource and add integration tests [datafusion]
alamb merged PR #16148: URL: https://github.com/apache/datafusion/pull/16148 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement schema adapter support for FileSource and add integration tests [datafusion]
alamb commented on code in PR #16148:
URL: https://github.com/apache/datafusion/pull/16148#discussion_r2115794350
##
datafusion/datasource/src/test_util.rs:
##
@@ -81,6 +83,8 @@ impl FileSource for MockSource {
fn file_type(&self) -> &str {
"mock"
}
+
+impl_schema_adapter_methods!();
Review Comment:
Here is my proposal for how to handle this API:
- https://github.com/apache/datafusion/pull/16214
I think it is fairly simple and follows the existing pattern in this codebase
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement schema adapter support for FileSource and add integration tests [datafusion]
kosiew commented on code in PR #16148:
URL: https://github.com/apache/datafusion/pull/16148#discussion_r2113115289
##
datafusion/datasource/src/test_util.rs:
##
@@ -81,6 +83,8 @@ impl FileSource for MockSource {
fn file_type(&self) -> &str {
"mock"
}
+
+impl_schema_adapter_methods!();
Review Comment:
@alamb , @adriangb ,
Thanks for discussion on this.
What do you think about implementing the schema adapter support via an
[opt-in
trait](https://github.com/apache/datafusion/pull/15295#discussion_r2100959986)
to avoid [breaking
changes](https://github.com/apache/datafusion/pull/16148#discussion_r2110769126)
in FileSource trait?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement schema adapter support for FileSource and add integration tests [datafusion]
kosiew commented on code in PR #16148:
URL: https://github.com/apache/datafusion/pull/16148#discussion_r2113115289
##
datafusion/datasource/src/test_util.rs:
##
@@ -81,6 +83,8 @@ impl FileSource for MockSource {
fn file_type(&self) -> &str {
"mock"
}
+
+impl_schema_adapter_methods!();
Review Comment:
@alamb , @adriangb ,
Thanks for your discussion on this.
What do you think about implementing the schema adapter support via an
[opt-in
trait](https://github.com/apache/datafusion/pull/15295#discussion_r2100959986)
to avoid [breaking
changes](https://github.com/apache/datafusion/pull/16148#discussion_r2110769126)
in FileSource trait?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement schema adapter support for FileSource and add integration tests [datafusion]
alamb commented on code in PR #16148:
URL: https://github.com/apache/datafusion/pull/16148#discussion_r2112282534
##
datafusion/datasource/src/test_util.rs:
##
@@ -81,6 +83,8 @@ impl FileSource for MockSource {
fn file_type(&self) -> &str {
"mock"
}
+
+impl_schema_adapter_methods!();
Review Comment:
This would be my preference too -- the `macro` is a nice way to reduce the
boiler plate (and @kosiew has documented it super well) but I think it then
adds a bit more cognative load and it would be better to have a little more
duplication to be explict
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement schema adapter support for FileSource and add integration tests [datafusion]
adriangb commented on code in PR #16148:
URL: https://github.com/apache/datafusion/pull/16148#discussion_r2112246851
##
datafusion/datasource/src/test_util.rs:
##
@@ -81,6 +83,8 @@ impl FileSource for MockSource {
fn file_type(&self) -> &str {
"mock"
}
+
+impl_schema_adapter_methods!();
Review Comment:
It does it via macros right? I'm basically saying that instead of providing
a macro that implements the functions for you I would force users to implement
the functions and (if necessary) provider helpers they can call from within
their implementation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement schema adapter support for FileSource and add integration tests [datafusion]
alamb commented on code in PR #16148:
URL: https://github.com/apache/datafusion/pull/16148#discussion_r2112237909
##
datafusion/datasource/src/test_util.rs:
##
@@ -81,6 +83,8 @@ impl FileSource for MockSource {
fn file_type(&self) -> &str {
"mock"
}
+
+impl_schema_adapter_methods!();
Review Comment:
This is what this PR does, right? Or are you suggesting a change?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement schema adapter support for FileSource and add integration tests [datafusion]
adriangb commented on code in PR #16148:
URL: https://github.com/apache/datafusion/pull/16148#discussion_r2112147587
##
datafusion/datasource/src/test_util.rs:
##
@@ -81,6 +83,8 @@ impl FileSource for MockSource {
fn file_type(&self) -> &str {
"mock"
}
+
+impl_schema_adapter_methods!();
Review Comment:
Then I think we should do that. We can provide helper methods e.g.
`no_op_method_name` such that users can do:
```rust
fn method_that_I_now_need_to_implement(&self, ...) -> ... {
no_op_helper(self) }
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement schema adapter support for FileSource and add integration tests [datafusion]
adriangb commented on code in PR #16148:
URL: https://github.com/apache/datafusion/pull/16148#discussion_r2112148579
##
datafusion/datasource/src/test_util.rs:
##
@@ -81,6 +83,8 @@ impl FileSource for MockSource {
fn file_type(&self) -> &str {
"mock"
}
+
+impl_schema_adapter_methods!();
Review Comment:
I think it's better to ask users to implement a 1 liner with clear
instructions in the upgrade guide and helpers to make the code simple than it
is to try to hide that from them.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement schema adapter support for FileSource and add integration tests [datafusion]
alamb commented on code in PR #16148:
URL: https://github.com/apache/datafusion/pull/16148#discussion_r2112141978
##
datafusion/datasource/src/test_util.rs:
##
@@ -81,6 +83,8 @@ impl FileSource for MockSource {
fn file_type(&self) -> &str {
"mock"
}
+
+impl_schema_adapter_methods!();
Review Comment:
> compile error because of the missing methods and thus be forced to make
them a no-op or unimplemented!()?
Yes, that is my understanding as well
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement schema adapter support for FileSource and add integration tests [datafusion]
adriangb commented on code in PR #16148:
URL: https://github.com/apache/datafusion/pull/16148#discussion_r2110769126
##
datafusion/datasource/src/test_util.rs:
##
@@ -81,6 +83,8 @@ impl FileSource for MockSource {
fn file_type(&self) -> &str {
"mock"
}
+
+impl_schema_adapter_methods!();
Review Comment:
If we implement the new methods for all structs in DataFusion, won't users
who upgrade get a compile error because of the missing methods and thus be
forced to make them a no-op or `unimplemented!()`? That seems reasonable to me.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement schema adapter support for FileSource and add integration tests [datafusion]
kosiew commented on PR #16148: URL: https://github.com/apache/datafusion/pull/16148#issuecomment-2914548073 Thank you @alamb for the review and feedback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement schema adapter support for FileSource and add integration tests [datafusion]
alamb commented on code in PR #16148:
URL: https://github.com/apache/datafusion/pull/16148#discussion_r2109906320
##
datafusion/core/tests/test_adapter_updated.rs:
##
@@ -0,0 +1,214 @@
+// Licensed to the Apache Software Foundation (ASF) under one
Review Comment:
Due to
1. THe previously mentioned reasons that more targets results in longer
compile times
2. It will be harder to find tests for the same feature if they are in
different files
I think we should remove this new file
`datafusion/core/tests/test_adapter_updated.rs` and put the test in
`datafusion/core/tests/integration_tests/schema_adapter_integration_tests.rs`
instead
However, we can do that as a follow on PR too
##
datafusion/datasource/src/macros.rs:
##
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Macros for the datafusion-datasource crate
+
+/// Helper macro to generate schema adapter methods for FileSource
implementations
Review Comment:
this is super well documented. I am still not a huge fan of adding this new
macro as I think it makes implementing DataSources that much more complicated,
but I can see the rationale for not adding a default on
https://github.com/apache/datafusion/pull/16148#discussion_r2106508912
So let's go with this approach and see how it goes
##
datafusion/datasource/src/test_util.rs:
##
@@ -81,6 +83,8 @@ impl FileSource for MockSource {
fn file_type(&self) -> &str {
"mock"
}
+
+impl_schema_adapter_methods!();
Review Comment:
> It forces every callsite—even the 99% that never “fail”—to handle a
Result. That’s a lot of boilerplate up front.
IN my opinion the boilerplate would be realtively minimal (likely it would
require `?` a few places)
> Pushing the “not implemented” case to runtime means we only discover
missing overrides via panics or errors in production, instead of compile-time
feedback.
This argument makes sense to me. However, another drawback is that after
this PR, it is required for `DataSource` implementations to handle schema
adapter, rather than allowing an implementation to return a runtime error if it
doesn't
I think that is probably fine but in the future I can still see a usecase
for a fallable Result
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement schema adapter support for FileSource and add integration tests [datafusion]
kosiew commented on code in PR #16148:
URL: https://github.com/apache/datafusion/pull/16148#discussion_r2106508912
##
datafusion/datasource/src/test_util.rs:
##
@@ -81,6 +83,8 @@ impl FileSource for MockSource {
fn file_type(&self) -> &str {
"mock"
}
+
+impl_schema_adapter_methods!();
Review Comment:
Hey, really appreciate the suggestion! Turning these two methods into trait
defaults is
[tempting](https://github.com/apache/datafusion/pull/16148/commits/ee07b69fbcc0a41bd2a859f5f0fa328b35d4e69d),
but we run into some frustrating object-safety and cloning issues:
1. Fallible signature (Result)
Switching to
```rust
fn with_schema_adapter_factory(...) -> Result, _>
```
has drawbacks:
It forces every callsite—even the 99% that never “fail”—to handle a Result.
That’s a lot of boilerplate up front.
Pushing the “not implemented” case to runtime means we only discover missing
overrides via panics or errors in production, instead of compile-time feedback.
2. Trait-object vs. Clone
A default like
```rust
fn with_schema_adapter_factory(
&self,
_factory: Arc,
) -> Arc {
Arc::new(self.clone())
}
```
can’t compile because:
self is a &Self, so self.clone() gives you another &Self, producing an
Arc<&Self>, and &Self isn’t a FileSource.
To make it work you’d need Self: Sized + Clone on the default—but then that
method isn’t even available on dyn FileSource, defeating trait-object use.
So, I am leaning towards keeping the macro because:
- Object-safe clone: by generating Arc::new(Self { …, ..self.clone() })
inside each impl, the macro leverages the concrete type’s Clone impl without
polluting the trait itself.
- Single maintenance point: if we ever tweak the method signature, we update
the macro once and every impl site gets fixed automatically.
- Compile-time assurance: missing impl_schema_adapter_methods!() on a type
immediately fails to compile, alerting the author they need to opt in.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement schema adapter support for FileSource and add integration tests [datafusion]
kosiew commented on code in PR #16148:
URL: https://github.com/apache/datafusion/pull/16148#discussion_r2106508912
##
datafusion/datasource/src/test_util.rs:
##
@@ -81,6 +83,8 @@ impl FileSource for MockSource {
fn file_type(&self) -> &str {
"mock"
}
+
+impl_schema_adapter_methods!();
Review Comment:
Hey, really appreciate the suggestion! Turning these two methods into trait
defaults is
[tempting](https://github.com/apache/datafusion/pull/16148/commits/ee07b69fbcc0a41bd2a859f5f0fa328b35d4e69d),
but we run into some frustrating object-safety and cloning issues:
1. Fallible signature (Result)
Switching to
```rust
fn with_schema_adapter_factory(...) -> Result, _>
```
also has drawbacks:
It forces every callsite—even the 99% that never “fail”—to handle a Result.
That’s a lot of boilerplate up front.
Pushing the “not implemented” case to runtime means we only discover missing
overrides via panics or errors in production, instead of compile-time feedback.
2. Trait-object vs. Clone
A default like
```rust
fn with_schema_adapter_factory(
&self,
_factory: Arc,
) -> Arc {
Arc::new(self.clone())
}
```
can’t compile because:
self is a &Self, so self.clone() gives you another &Self, producing an
Arc<&Self>, and &Self isn’t a FileSource.
To make it work you’d need Self: Sized + Clone on the default—but then that
method isn’t even available on dyn FileSource, defeating trait-object use.
So, I am leaning towards keeping the macro because:
- Object-safe clone: by generating Arc::new(Self { …, ..self.clone() })
inside each impl, the macro leverages the concrete type’s Clone impl without
polluting the trait itself.
- Single maintenance point: if we ever tweak the method signature, we update
the macro once and every impl site gets fixed automatically.
- Compile-time assurance: missing impl_schema_adapter_methods!() on a type
immediately fails to compile, alerting the author they need to opt in.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Implement schema adapter support for FileSource and add integration tests [datafusion]
kosiew commented on code in PR #16148:
URL: https://github.com/apache/datafusion/pull/16148#discussion_r2106461041
##
datafusion/datasource-parquet/tests/apply_schema_adapter_tests.rs:
##
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod parquet_adapter_tests {
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{ColumnStatistics, DataFusionError, Result};
+use datafusion_datasource::file::FileSource;
+use datafusion_datasource::file_scan_config::{
+FileScanConfig, FileScanConfigBuilder,
+};
+use datafusion_datasource::schema_adapter::{
+SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
+};
+use datafusion_datasource_parquet::source::ParquetSource;
+use datafusion_execution::object_store::ObjectStoreUrl;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// A test schema adapter factory that adds prefix to column names
+#[derive(Debug)]
+struct PrefixAdapterFactory {
+prefix: String,
+}
+
+impl SchemaAdapterFactory for PrefixAdapterFactory {
+fn create(
+&self,
+projected_table_schema: SchemaRef,
+_table_schema: SchemaRef,
+) -> Box {
+Box::new(PrefixAdapter {
+input_schema: projected_table_schema,
+prefix: self.prefix.clone(),
+})
+}
+}
+
+/// A test schema adapter that adds prefix to column names
+#[derive(Debug)]
+struct PrefixAdapter {
+input_schema: SchemaRef,
+prefix: String,
+}
+
+impl SchemaAdapter for PrefixAdapter {
+fn map_column_index(&self, index: usize, file_schema: &Schema) ->
Option {
+let field = self.input_schema.field(index);
+file_schema.fields.find(field.name()).map(|(i, _)| i)
+}
+
+fn map_schema(
+&self,
+file_schema: &Schema,
+) -> Result<(Arc, Vec)> {
+let mut projection =
Vec::with_capacity(file_schema.fields().len());
+for (file_idx, file_field) in
file_schema.fields().iter().enumerate() {
+if
self.input_schema.fields().find(file_field.name()).is_some() {
+projection.push(file_idx);
+}
+}
+
+// Create a schema mapper that adds a prefix to column names
+#[derive(Debug)]
+struct PrefixSchemaMapping {
+// Keep only the prefix field which is actually used in the
implementation
+prefix: String,
+}
+
+impl SchemaMapper for PrefixSchemaMapping {
+fn map_batch(&self, batch: RecordBatch) -> Result
{
+// Create a new schema with prefixed field names
+let prefixed_fields: Vec = batch
+.schema()
+.fields()
+.iter()
+.map(|field| {
+Field::new(
+format!("{}{}", self.prefix, field.name()),
+field.data_type().clone(),
+field.is_nullable(),
+)
+})
+.collect();
+let prefixed_schema =
Arc::new(Schema::new(prefixed_fields));
+
+// Create a new batch with the prefixed schema but the
same data
+let options =
arrow::record_batch::RecordBatchOptions::default();
+RecordBatch::try_new_with_options(
+prefixed_schema,
+batch.columns().to_vec(),
+&options,
+)
+.map_err(|e| DataFusionError::ArrowError(e, None))
+}
+
+fn map_column_statistics(
+&self,
+stats: &[ColumnStatistics],
+) -> Result> {
+// For testing, just return the input statistics
+Ok(stats.to_vec())
+
Re: [PR] Implement schema adapter support for FileSource and add integration tests [datafusion]
kosiew commented on code in PR #16148:
URL: https://github.com/apache/datafusion/pull/16148#discussion_r2106445116
##
datafusion/core/tests/test_source_adapter_tests.rs:
##
@@ -0,0 +1,233 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion_common::{ColumnStatistics, DataFusionError, Result, Statistics};
+use datafusion_datasource::file::FileSource;
+use datafusion_datasource::file_scan_config::FileScanConfig;
+use datafusion_datasource::file_stream::FileOpener;
+use datafusion_datasource::impl_schema_adapter_methods;
+use datafusion_datasource::schema_adapter::{
+SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
+};
+use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
+use datafusion_physical_plan::{DisplayAs, DisplayFormatType};
+use object_store::ObjectStore;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+// Simple TestSource implementation for testing without dependency on private
module
+#[derive(Clone, Debug)]
+struct TestSource {
+#[allow(dead_code)]
+has_adapter: bool,
+schema_adapter_factory: Option>,
+}
+
+impl TestSource {
+fn new(has_adapter: bool) -> Self {
+Self {
+has_adapter,
+schema_adapter_factory: None,
+}
+}
+}
+
+impl FileSource for TestSource {
+fn file_type(&self) -> &str {
+"test"
+}
+
+fn as_any(&self) -> &dyn std::any::Any {
+self
+}
+
+fn create_file_opener(
+&self,
+_store: Arc,
+_conf: &FileScanConfig,
+_index: usize,
+) -> Arc {
+unimplemented!("Not needed for this test")
+}
+
+fn with_batch_size(&self, _batch_size: usize) -> Arc {
+Arc::new(self.clone())
+}
+
+fn with_schema(&self, _schema: SchemaRef) -> Arc {
+Arc::new(self.clone())
+}
+
+fn with_projection(&self, _projection: &FileScanConfig) -> Arc {
+Arc::new(self.clone())
+}
+
+fn with_statistics(&self, _statistics: Statistics) -> Arc {
+Arc::new(self.clone())
+}
+
+fn metrics(&self) -> &ExecutionPlanMetricsSet {
+unimplemented!("Not needed for this test")
+}
+
+fn statistics(&self) -> Result {
+Ok(Statistics::default())
+}
+
+impl_schema_adapter_methods!();
+}
+
+impl DisplayAs for TestSource {
+fn fmt_as(
+&self,
+t: DisplayFormatType,
+f: &mut std::fmt::Formatter,
+) -> std::fmt::Result {
+match t {
+DisplayFormatType::Default
+| DisplayFormatType::Verbose
+| DisplayFormatType::TreeRender => {
+write!(f, "TestSource")
+}
+}
+}
+}
+
+/// A simple schema adapter factory for testing
+#[derive(Debug)]
+struct TestFilterPushdownAdapterFactory {}
+
+impl SchemaAdapterFactory for TestFilterPushdownAdapterFactory {
+fn create(
+&self,
+projected_table_schema: SchemaRef,
+_table_schema: SchemaRef,
+) -> Box {
+Box::new(TestFilterPushdownAdapter {
+input_schema: projected_table_schema,
+})
+}
+}
+
+/// A simple schema adapter for testing
+#[derive(Debug)]
+struct TestFilterPushdownAdapter {
+input_schema: SchemaRef,
+}
+
+impl SchemaAdapter for TestFilterPushdownAdapter {
+fn map_column_index(&self, index: usize, file_schema: &Schema) ->
Option {
+let field = self.input_schema.field(index);
+file_schema.fields.find(field.name()).map(|(i, _)| i)
+}
+
+fn map_schema(
+&self,
+file_schema: &Schema,
+) -> Result<(Arc, Vec)> {
+let mut projection = Vec::with_capacity(file_schema.fields().len());
+for (file_idx, file_field) in file_schema.fields().iter().enumerate() {
+if self.input_schema.fields().find(file_field.name()).is_some() {
+projection.push(file_idx);
+}
+}
+
+// Create a schema mapper that modifies column names
+#[derive(Debug)]
+struct TestSchemaMapping {
+#[allow(dead_code)]
+input_schema: SchemaRef,
+}
+
+impl SchemaMapper for TestSchemaMapping {
+f
Re: [PR] Implement schema adapter support for FileSource and add integration tests [datafusion]
kosiew commented on code in PR #16148:
URL: https://github.com/apache/datafusion/pull/16148#discussion_r2106439691
##
datafusion/core/tests/test_adapter_updated.rs:
##
@@ -0,0 +1,201 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{ColumnStatistics, DataFusionError, Result, Statistics};
+use datafusion_datasource::file::FileSource;
+use datafusion_datasource::file_scan_config::FileScanConfig;
+use datafusion_datasource::file_stream::FileOpener;
+use datafusion_datasource::schema_adapter::{
+SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
+};
+use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
+use object_store::ObjectStore;
+use std::any::Any;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// A test source for testing schema adapters
+#[derive(Debug, Clone)]
+struct TestSource {
+schema_adapter_factory: Option>,
+}
+
+impl TestSource {
+fn new() -> Self {
+Self {
+schema_adapter_factory: None,
+}
+}
+}
+
+impl FileSource for TestSource {
+fn file_type(&self) -> &str {
+"test"
+}
+
+fn as_any(&self) -> &dyn Any {
+self
+}
+
+fn create_file_opener(
+&self,
+_store: Arc,
+_conf: &FileScanConfig,
+_index: usize,
+) -> Arc {
+unimplemented!("Not needed for this test")
+}
+
+fn with_batch_size(&self, _batch_size: usize) -> Arc {
+Arc::new(self.clone())
+}
+
+fn with_schema(&self, _schema: SchemaRef) -> Arc {
+Arc::new(self.clone())
+}
+
+fn with_projection(&self, _projection: &FileScanConfig) -> Arc {
+Arc::new(self.clone())
+}
+
+fn with_statistics(&self, _statistics: Statistics) -> Arc {
+Arc::new(self.clone())
+}
+
+fn metrics(&self) -> &ExecutionPlanMetricsSet {
+unimplemented!("Not needed for this test")
+}
+
+fn statistics(&self) -> Result {
+Ok(Statistics::default())
+}
+
+fn with_schema_adapter_factory(
+&self,
+schema_adapter_factory: Arc,
+) -> Arc {
+Arc::new(Self {
+schema_adapter_factory: Some(schema_adapter_factory),
+})
+}
+
+fn schema_adapter_factory(&self) -> Option> {
+self.schema_adapter_factory.clone()
+}
+}
+
+/// A test schema adapter factory
+#[derive(Debug)]
+struct TestSchemaAdapterFactory {}
+
+impl SchemaAdapterFactory for TestSchemaAdapterFactory {
+fn create(
+&self,
+projected_table_schema: SchemaRef,
+_table_schema: SchemaRef,
+) -> Box {
+Box::new(TestSchemaAdapter {
+table_schema: projected_table_schema,
+})
+}
+}
+
+/// A test schema adapter implementation
+#[derive(Debug)]
+struct TestSchemaAdapter {
+table_schema: SchemaRef,
+}
+
+impl SchemaAdapter for TestSchemaAdapter {
+fn map_column_index(&self, index: usize, file_schema: &Schema) ->
Option {
+let field = self.table_schema.field(index);
+file_schema.fields.find(field.name()).map(|(i, _)| i)
+}
+
+fn map_schema(
+&self,
+file_schema: &Schema,
+) -> Result<(Arc, Vec)> {
+let mut projection = Vec::with_capacity(file_schema.fields().len());
+for (file_idx, file_field) in file_schema.fields().iter().enumerate() {
+if self.table_schema.fields().find(file_field.name()).is_some() {
+projection.push(file_idx);
+}
+}
+
+Ok((Arc::new(TestSchemaMapping {}), projection))
+}
+}
+
+/// A test schema mapper implementation
+#[derive(Debug)]
+struct TestSchemaMapping {}
+
+impl SchemaMapper for TestSchemaMapping {
+fn map_batch(&self, batch: RecordBatch) -> Result {
+// For testing, just return the original batch
+Ok(batch)
+}
+
+fn map_column_statistics(
+&self,
+stats: &[ColumnStatistics],
+) -> Result> {
+// For testing, just return the input statistics
+Ok(stats.to_vec())
+}
+}
+
+#[test]
+fn test_schema_adapter() {
+// Create a test schema
+let table_schema = Arc::
Re: [PR] Implement schema adapter support for FileSource and add integration tests [datafusion]
kosiew commented on code in PR #16148:
URL: https://github.com/apache/datafusion/pull/16148#discussion_r2106439691
##
datafusion/core/tests/test_adapter_updated.rs:
##
@@ -0,0 +1,201 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{ColumnStatistics, DataFusionError, Result, Statistics};
+use datafusion_datasource::file::FileSource;
+use datafusion_datasource::file_scan_config::FileScanConfig;
+use datafusion_datasource::file_stream::FileOpener;
+use datafusion_datasource::schema_adapter::{
+SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
+};
+use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
+use object_store::ObjectStore;
+use std::any::Any;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// A test source for testing schema adapters
+#[derive(Debug, Clone)]
+struct TestSource {
+schema_adapter_factory: Option>,
+}
+
+impl TestSource {
+fn new() -> Self {
+Self {
+schema_adapter_factory: None,
+}
+}
+}
+
+impl FileSource for TestSource {
+fn file_type(&self) -> &str {
+"test"
+}
+
+fn as_any(&self) -> &dyn Any {
+self
+}
+
+fn create_file_opener(
+&self,
+_store: Arc,
+_conf: &FileScanConfig,
+_index: usize,
+) -> Arc {
+unimplemented!("Not needed for this test")
+}
+
+fn with_batch_size(&self, _batch_size: usize) -> Arc {
+Arc::new(self.clone())
+}
+
+fn with_schema(&self, _schema: SchemaRef) -> Arc {
+Arc::new(self.clone())
+}
+
+fn with_projection(&self, _projection: &FileScanConfig) -> Arc {
+Arc::new(self.clone())
+}
+
+fn with_statistics(&self, _statistics: Statistics) -> Arc {
+Arc::new(self.clone())
+}
+
+fn metrics(&self) -> &ExecutionPlanMetricsSet {
+unimplemented!("Not needed for this test")
+}
+
+fn statistics(&self) -> Result {
+Ok(Statistics::default())
+}
+
+fn with_schema_adapter_factory(
+&self,
+schema_adapter_factory: Arc,
+) -> Arc {
+Arc::new(Self {
+schema_adapter_factory: Some(schema_adapter_factory),
+})
+}
+
+fn schema_adapter_factory(&self) -> Option> {
+self.schema_adapter_factory.clone()
+}
+}
+
+/// A test schema adapter factory
+#[derive(Debug)]
+struct TestSchemaAdapterFactory {}
+
+impl SchemaAdapterFactory for TestSchemaAdapterFactory {
+fn create(
+&self,
+projected_table_schema: SchemaRef,
+_table_schema: SchemaRef,
+) -> Box {
+Box::new(TestSchemaAdapter {
+table_schema: projected_table_schema,
+})
+}
+}
+
+/// A test schema adapter implementation
+#[derive(Debug)]
+struct TestSchemaAdapter {
+table_schema: SchemaRef,
+}
+
+impl SchemaAdapter for TestSchemaAdapter {
+fn map_column_index(&self, index: usize, file_schema: &Schema) ->
Option {
+let field = self.table_schema.field(index);
+file_schema.fields.find(field.name()).map(|(i, _)| i)
+}
+
+fn map_schema(
+&self,
+file_schema: &Schema,
+) -> Result<(Arc, Vec)> {
+let mut projection = Vec::with_capacity(file_schema.fields().len());
+for (file_idx, file_field) in file_schema.fields().iter().enumerate() {
+if self.table_schema.fields().find(file_field.name()).is_some() {
+projection.push(file_idx);
+}
+}
+
+Ok((Arc::new(TestSchemaMapping {}), projection))
+}
+}
+
+/// A test schema mapper implementation
+#[derive(Debug)]
+struct TestSchemaMapping {}
+
+impl SchemaMapper for TestSchemaMapping {
+fn map_batch(&self, batch: RecordBatch) -> Result {
+// For testing, just return the original batch
+Ok(batch)
+}
+
+fn map_column_statistics(
+&self,
+stats: &[ColumnStatistics],
+) -> Result> {
+// For testing, just return the input statistics
+Ok(stats.to_vec())
+}
+}
+
+#[test]
+fn test_schema_adapter() {
+// Create a test schema
+let table_schema = Arc::
Re: [PR] Implement schema adapter support for FileSource and add integration tests [datafusion]
kosiew commented on code in PR #16148:
URL: https://github.com/apache/datafusion/pull/16148#discussion_r2106428855
##
datafusion/core/tests/integration_tests/schema_adapter_integration_tests.rs:
##
@@ -0,0 +1,197 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration test for schema adapter factory functionality
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow::record_batch::RecordBatch;
+use datafusion::datasource::object_store::ObjectStoreUrl;
+use datafusion::datasource::physical_plan::arrow_file::ArrowSource;
+use datafusion::prelude::*;
+use datafusion_common::Result;
+use datafusion_datasource::file::FileSource;
+use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
+use datafusion_datasource::schema_adapter::{SchemaAdapter,
SchemaAdapterFactory};
+use datafusion_datasource::source::DataSourceExec;
+use std::sync::Arc;
+use tempfile::TempDir;
+
+#[cfg(feature = "parquet")]
+use datafusion_datasource_parquet::ParquetSource;
+#[cfg(feature = "parquet")]
+use parquet::arrow::ArrowWriter;
+#[cfg(feature = "parquet")]
+use parquet::file::properties::WriterProperties;
+
+#[cfg(feature = "csv")]
+use datafusion_datasource_csv::CsvSource;
+
+/// A schema adapter factory that transforms column names to uppercase
+#[derive(Debug)]
+struct UppercaseAdapterFactory {}
+
+impl SchemaAdapterFactory for UppercaseAdapterFactory {
+fn create(&self, schema: &Schema) -> Result> {
+Ok(Box::new(UppercaseAdapter {
+input_schema: Arc::new(schema.clone()),
+}))
+}
+}
+
+/// Schema adapter that transforms column names to uppercase
+#[derive(Debug)]
+struct UppercaseAdapter {
+input_schema: SchemaRef,
+}
+
+impl SchemaAdapter for UppercaseAdapter {
+fn adapt(&self, record_batch: RecordBatch) -> Result {
+// In a real adapter, we might transform the data too
+// For this test, we're just passing through the batch
+Ok(record_batch)
+}
+
+fn output_schema(&self) -> SchemaRef {
+let fields = self
+.input_schema
+.fields()
+.iter()
+.map(|f| {
+Field::new(
+f.name().to_uppercase().as_str(),
+f.data_type().clone(),
+f.is_nullable(),
+)
+})
+.collect();
+
+Arc::new(Schema::new(fields))
+}
+}
+
+#[cfg(feature = "parquet")]
+#[tokio::test]
+async fn test_parquet_integration_with_schema_adapter() -> Result<()> {
+// Create a temporary directory for our test file
+let tmp_dir = TempDir::new()?;
+let file_path = tmp_dir.path().join("test.parquet");
+let file_path_str = file_path.to_str().unwrap();
+
+// Create test data
+let schema = Arc::new(Schema::new(vec![
+Field::new("id", DataType::Int32, false),
+Field::new("name", DataType::Utf8, true),
+]));
+
+let batch = RecordBatch::try_new(
+schema.clone(),
+vec![
+Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])),
+Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c"])),
+],
+)?;
+
+// Write test parquet file
+let file = std::fs::File::create(file_path_str)?;
+let props = WriterProperties::builder().build();
+let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props))?;
+writer.write(&batch)?;
+writer.close()?;
+
+// Create a session context
+let ctx = SessionContext::new();
+
+// Create a ParquetSource with the adapter factory
+let source = ParquetSource::default()
+.with_schema_adapter_factory(Arc::new(UppercaseAdapterFactory {}));
+
+// Create a scan config
+let config = FileScanConfigBuilder::new(
+ObjectStoreUrl::parse(&format!("file://{}", file_path_str))?,
+schema.clone(),
+)
+.with_source(source)
+.build();
+
+// Create a data source executor
+let exec = DataSourceExec::from_data_source(config);
+
+// Collect results
+let task_ctx = ctx.task_ctx();
+let stream = exec.execute(0, task_ctx)?;
+let batches = datafusion::physical_plan::common::collect(stream).await?;
+
+//
Re: [PR] Implement schema adapter support for FileSource and add integration tests [datafusion]
alamb commented on PR #16148: URL: https://github.com/apache/datafusion/pull/16148#issuecomment-2905734672 fyi @adriangb -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Implement schema adapter support for FileSource and add integration tests [datafusion]
alamb commented on code in PR #16148:
URL: https://github.com/apache/datafusion/pull/16148#discussion_r2105358307
##
datafusion/core/tests/test_adapter_updated.rs:
##
@@ -0,0 +1,201 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{ColumnStatistics, DataFusionError, Result, Statistics};
+use datafusion_datasource::file::FileSource;
+use datafusion_datasource::file_scan_config::FileScanConfig;
+use datafusion_datasource::file_stream::FileOpener;
+use datafusion_datasource::schema_adapter::{
+SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
+};
+use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
+use object_store::ObjectStore;
+use std::any::Any;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// A test source for testing schema adapters
+#[derive(Debug, Clone)]
+struct TestSource {
+schema_adapter_factory: Option>,
+}
+
+impl TestSource {
+fn new() -> Self {
+Self {
+schema_adapter_factory: None,
+}
+}
+}
+
+impl FileSource for TestSource {
+fn file_type(&self) -> &str {
+"test"
+}
+
+fn as_any(&self) -> &dyn Any {
+self
+}
+
+fn create_file_opener(
+&self,
+_store: Arc,
+_conf: &FileScanConfig,
+_index: usize,
+) -> Arc {
+unimplemented!("Not needed for this test")
+}
+
+fn with_batch_size(&self, _batch_size: usize) -> Arc {
+Arc::new(self.clone())
+}
+
+fn with_schema(&self, _schema: SchemaRef) -> Arc {
+Arc::new(self.clone())
+}
+
+fn with_projection(&self, _projection: &FileScanConfig) -> Arc {
+Arc::new(self.clone())
+}
+
+fn with_statistics(&self, _statistics: Statistics) -> Arc {
+Arc::new(self.clone())
+}
+
+fn metrics(&self) -> &ExecutionPlanMetricsSet {
+unimplemented!("Not needed for this test")
+}
+
+fn statistics(&self) -> Result {
+Ok(Statistics::default())
+}
+
+fn with_schema_adapter_factory(
+&self,
+schema_adapter_factory: Arc,
+) -> Arc {
+Arc::new(Self {
+schema_adapter_factory: Some(schema_adapter_factory),
+})
+}
+
+fn schema_adapter_factory(&self) -> Option> {
+self.schema_adapter_factory.clone()
+}
+}
+
+/// A test schema adapter factory
+#[derive(Debug)]
+struct TestSchemaAdapterFactory {}
+
+impl SchemaAdapterFactory for TestSchemaAdapterFactory {
+fn create(
+&self,
+projected_table_schema: SchemaRef,
+_table_schema: SchemaRef,
+) -> Box {
+Box::new(TestSchemaAdapter {
+table_schema: projected_table_schema,
+})
+}
+}
+
+/// A test schema adapter implementation
+#[derive(Debug)]
+struct TestSchemaAdapter {
+table_schema: SchemaRef,
+}
+
+impl SchemaAdapter for TestSchemaAdapter {
+fn map_column_index(&self, index: usize, file_schema: &Schema) ->
Option {
+let field = self.table_schema.field(index);
+file_schema.fields.find(field.name()).map(|(i, _)| i)
+}
+
+fn map_schema(
+&self,
+file_schema: &Schema,
+) -> Result<(Arc, Vec)> {
+let mut projection = Vec::with_capacity(file_schema.fields().len());
+for (file_idx, file_field) in file_schema.fields().iter().enumerate() {
+if self.table_schema.fields().find(file_field.name()).is_some() {
+projection.push(file_idx);
+}
+}
+
+Ok((Arc::new(TestSchemaMapping {}), projection))
+}
+}
+
+/// A test schema mapper implementation
+#[derive(Debug)]
+struct TestSchemaMapping {}
+
+impl SchemaMapper for TestSchemaMapping {
+fn map_batch(&self, batch: RecordBatch) -> Result {
+// For testing, just return the original batch
+Ok(batch)
+}
+
+fn map_column_statistics(
+&self,
+stats: &[ColumnStatistics],
+) -> Result> {
+// For testing, just return the input statistics
+Ok(stats.to_vec())
+}
+}
+
+#[test]
+fn test_schema_adapter() {
+// Create a test schema
+let table_schema = Arc::n
