Blizzara commented on code in PR #12245:
URL: https://github.com/apache/datafusion/pull/12245#discussion_r1740400324
##########
datafusion/substrait/src/logical_plan/consumer.rs:
##########
@@ -1601,12 +1647,17 @@ fn from_substrait_named_struct(
);
if name_idx != base_schema.names.len() {
return substrait_err!(
- "Names list must match exactly to nested
schema, but found {} uses for {} names",
- name_idx,
- base_schema.names.len()
- );
+ "Names list must match exactly to nested schema, but found {} uses
for {} names",
+ name_idx,
+ base_schema.names.len()
+ );
+ }
+ let mut df_schema = DFSchema::try_from(Schema::new(fields?))?;
+ match field_qualifier {
+ None => (),
+ Some(fq) => df_schema = df_schema.replace_qualifier(fq),
Review Comment:
Is this needed for something? Maybe anyways rather than taking in an
optional parameter, just call the `.replace_qualifier` on the returned schema
where it's needed.
##########
datafusion/substrait/src/logical_plan/consumer.rs:
##########
@@ -850,6 +868,31 @@ pub async fn from_substrait_rel(
}
}
+/// Validates that the given Substrait schema matches the given DataFusion
schema
+/// Returns true if the two schemas have the same qualified named fields with
the same data types.
+/// Returns false otherwise.
+/// Ignores case when comparing field names
+///
+/// This code is equivalent to [DFSchema::equivalent_names_and_types] except
that the field name
+/// checking is case-insensitive
+fn validate_substrait_schema(
+ datafusion_schema: &DFSchema,
+ substrait_schema: &DFSchema,
+) -> bool {
+ if datafusion_schema.fields().len() != substrait_schema.fields().len() {
+ return false;
+ }
+ let datafusion_fields = datafusion_schema.iter();
+ let substrait_fields = substrait_schema.iter();
+ datafusion_fields
+ .zip(substrait_fields)
+ .all(|((q1, f1), (q2, f2))| {
+ q1 == q2
+ && f1.name().to_lowercase() == f2.name().to_lowercase()
+ && f1.data_type().equals_datatype(f2.data_type())
Review Comment:
I think this asserts the nullability of inner fields in things like lists.
That's probably too strict, we should accept cases where the DF schema is
non-nullable but Substrait schema allows null.
##########
datafusion/substrait/src/logical_plan/producer.rs:
##########
@@ -140,19 +140,13 @@ pub fn to_substrait_rel(
maintain_singular_struct: false,
});
+ let table_schema = scan.source.schema().to_dfschema_ref()?;
+ let base_schema = to_substrait_named_struct(&table_schema,
extensions)?;
+
Ok(Box::new(Rel {
rel_type: Some(RelType::Read(Box::new(ReadRel {
common: None,
- base_schema: Some(NamedStruct {
- names: scan
- .source
- .schema()
- .fields()
- .iter()
- .map(|f| f.name().to_owned())
- .collect(),
- r#struct: None,
- }),
+ base_schema: Some(base_schema),
Review Comment:
nice!
##########
datafusion/substrait/src/logical_plan/consumer.rs:
##########
@@ -1586,9 +1629,12 @@ fn next_struct_field_name(
}
}
-fn from_substrait_named_struct(
+/// Convert Substrait NamedStruct to DataFusion DFSchemaRef
+pub fn from_substrait_named_struct(
Review Comment:
you can do `pub(crate) fn`
##########
datafusion/substrait/tests/utils.rs:
##########
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[cfg(test)]
+pub mod test {
+ use datafusion::catalog_common::TableReference;
+ use datafusion::datasource::empty::EmptyTable;
+ use datafusion::prelude::SessionContext;
+ use datafusion_substrait::extensions::Extensions;
+ use
datafusion_substrait::logical_plan::consumer::from_substrait_named_struct;
+ use std::fs::File;
+ use std::io::BufReader;
+ use std::sync::Arc;
+ use substrait::proto::read_rel::{NamedTable, ReadType};
+ use substrait::proto::rel::RelType;
+ use substrait::proto::{Plan, ReadRel, Rel};
+
+ pub fn read_json(path: &str) -> Plan {
+ serde_json::from_reader::<_, Plan>(BufReader::new(
+ File::open(path).expect("file not found"),
+ ))
+ .expect("failed to parse json")
+ }
+
+ pub struct TestSchemaCollector {
+ ctx: SessionContext,
+ }
+
+ impl TestSchemaCollector {
+ fn new() -> Self {
+ TestSchemaCollector {
+ ctx: SessionContext::new(),
+ }
+ }
+
+ pub fn generate_context_from_plan(plan: &Plan) -> SessionContext {
+ let mut schema_collector = Self::new();
+
+ for plan_rel in plan.relations.iter() {
+ match plan_rel.rel_type.as_ref().expect("blah") {
Review Comment:
"blah" ? :)
##########
datafusion/substrait/src/logical_plan/consumer.rs:
##########
@@ -850,6 +868,31 @@ pub async fn from_substrait_rel(
}
}
+/// Validates that the given Substrait schema matches the given DataFusion
schema
Review Comment:
I feel like requiring an exact match is unnecessarily harsh. I'd prefer
checking that the Substrait schema is a subset of the DF schema. I could see
our use-case for example storing a given Substrait plan and running it again
and again over time, while the input dataset may change to add new columns.
There's no reason why that shouldn't work, right?
(See also the comment about nulls below)
##########
datafusion/substrait/tests/utils.rs:
##########
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[cfg(test)]
+pub mod test {
+ use datafusion::catalog_common::TableReference;
+ use datafusion::datasource::empty::EmptyTable;
+ use datafusion::prelude::SessionContext;
+ use datafusion_substrait::extensions::Extensions;
+ use
datafusion_substrait::logical_plan::consumer::from_substrait_named_struct;
+ use std::fs::File;
+ use std::io::BufReader;
+ use std::sync::Arc;
+ use substrait::proto::read_rel::{NamedTable, ReadType};
+ use substrait::proto::rel::RelType;
+ use substrait::proto::{Plan, ReadRel, Rel};
+
+ pub fn read_json(path: &str) -> Plan {
+ serde_json::from_reader::<_, Plan>(BufReader::new(
+ File::open(path).expect("file not found"),
+ ))
+ .expect("failed to parse json")
+ }
+
+ pub struct TestSchemaCollector {
+ ctx: SessionContext,
+ }
+
+ impl TestSchemaCollector {
+ fn new() -> Self {
+ TestSchemaCollector {
+ ctx: SessionContext::new(),
+ }
+ }
+
+ pub fn generate_context_from_plan(plan: &Plan) -> SessionContext {
+ let mut schema_collector = Self::new();
+
+ for plan_rel in plan.relations.iter() {
+ match plan_rel.rel_type.as_ref().expect("blah") {
+ substrait::proto::plan_rel::RelType::Rel(r) => {
+ schema_collector.collect_schemas(r)
+ }
+ substrait::proto::plan_rel::RelType::Root(r) =>
schema_collector
+ .collect_schemas(
+ r.input.as_ref().expect("RelRoot must set input"),
+ ),
+ }
+ }
+ schema_collector.ctx
+ }
+
+ fn collect_named_table(&mut self, read: &ReadRel, nt: &NamedTable) {
+ let table_reference = match nt.names.len() {
+ 0 => {
+ panic!("No table name found in NamedTable");
+ }
+ 1 => TableReference::Bare {
+ table: nt.names[0].clone().into(),
+ },
+ 2 => TableReference::Partial {
+ schema: nt.names[0].clone().into(),
+ table: nt.names[1].clone().into(),
+ },
+ _ => TableReference::Full {
+ catalog: nt.names[0].clone().into(),
+ schema: nt.names[1].clone().into(),
+ table: nt.names[2].clone().into(),
+ },
+ };
+
+ let substrait_schema = read
+ .base_schema
+ .as_ref()
+ .expect("No base schema found for NamedTable");
+ let empty_extensions = Extensions {
+ functions: Default::default(),
+ types: Default::default(),
+ type_variations: Default::default(),
+ };
+
+ let df_schema = from_substrait_named_struct(
+ substrait_schema,
+ &empty_extensions,
+ Some(table_reference.clone()),
+ )
+ .expect("Unable to generate DataFusion schema from Substrait
NamedStruct");
+ let table = EmptyTable::new(df_schema.inner().clone());
+ self.ctx
Review Comment:
might be nicer to just return the name and schema (that'd also better match
the name "collect") rather than modify ctx directly
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]