This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 479a277e45 Relax physical schema validation (#14519)
479a277e45 is described below
commit 479a277e4573e31f4a7897d78109259f4ef66104
Author: Piotr Findeisen <[email protected]>
AuthorDate: Fri Feb 7 18:08:02 2025 +0100
Relax physical schema validation (#14519)
Physical plan can be further optimized. In particular, an expression can
be determined as never null even if it wasn't known at the time of
logical planning. Thus, the final schema check needs to be relax,
allowing now-non-null data where nullable data was expected. This
replaces schema equality check, with asymmetric "is satisfied by"
relation.
---
datafusion/core/src/lib.rs | 1 +
datafusion/core/src/physical_planner.rs | 8 ++-
datafusion/core/src/schema_equivalence.rs | 84 ++++++++++++++++++++++++++++
datafusion/sqllogictest/test_files/union.slt | 10 ++++
4 files changed, 101 insertions(+), 2 deletions(-)
diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs
index 9d42580178..70b302a55c 100644
--- a/datafusion/core/src/lib.rs
+++ b/datafusion/core/src/lib.rs
@@ -812,6 +812,7 @@ pub mod variable {
#[cfg(not(target_arch = "wasm32"))]
pub mod test;
+mod schema_equivalence;
pub mod test_util;
#[cfg(doctest)]
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index cee3acc08d..d96e60c25f 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -89,6 +89,7 @@ use
datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_physical_plan::unnest::ListUnnest;
use datafusion_sql::utils::window_expr_common_partition_keys;
+use crate::schema_equivalence::schema_satisfied_by;
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use itertools::{multiunzip, Itertools};
@@ -659,7 +660,10 @@ impl DefaultPhysicalPlanner {
let physical_input_schema_from_logical =
logical_input_schema.inner();
if !options.execution.skip_physical_aggregate_schema_check
- && &physical_input_schema !=
physical_input_schema_from_logical
+ && !schema_satisfied_by(
+ physical_input_schema_from_logical,
+ &physical_input_schema,
+ )
{
let mut differences = Vec::new();
if physical_input_schema.fields().len()
@@ -688,7 +692,7 @@ impl DefaultPhysicalPlanner {
if physical_field.data_type() !=
logical_field.data_type() {
differences.push(format!("field data type at index
{} [{}]: (physical) {} vs (logical) {}", i, physical_field.name(),
physical_field.data_type(), logical_field.data_type()));
}
- if physical_field.is_nullable() !=
logical_field.is_nullable() {
+ if physical_field.is_nullable() &&
!logical_field.is_nullable() {
differences.push(format!("field nullability at
index {} [{}]: (physical) {} vs (logical) {}", i, physical_field.name(),
physical_field.is_nullable(), logical_field.is_nullable()));
}
}
diff --git a/datafusion/core/src/schema_equivalence.rs
b/datafusion/core/src/schema_equivalence.rs
new file mode 100644
index 0000000000..f0d2acad6b
--- /dev/null
+++ b/datafusion/core/src/schema_equivalence.rs
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_schema::{DataType, Field, Fields, Schema};
+
+/// Verifies whether the original planned schema can be satisfied with data
+/// adhering to the candidate schema. In practice, this is equality check on
the
+/// schemas except that original schema can have nullable fields where
candidate
+/// is constrained to not provide null data.
+pub(crate) fn schema_satisfied_by(original: &Schema, candidate: &Schema) ->
bool {
+ original.metadata() == candidate.metadata()
+ && fields_satisfied_by(original.fields(), candidate.fields())
+}
+
+/// See [`schema_satisfied_by`] for the contract.
+fn fields_satisfied_by(original: &Fields, candidate: &Fields) -> bool {
+ original.len() == candidate.len()
+ && original
+ .iter()
+ .zip(candidate)
+ .all(|(original, candidate)| field_satisfied_by(original,
candidate))
+}
+
+/// See [`schema_satisfied_by`] for the contract.
+fn field_satisfied_by(original: &Field, candidate: &Field) -> bool {
+ original.name() == candidate.name()
+ && (original.is_nullable() || !candidate.is_nullable())
+ && original.metadata() == candidate.metadata()
+ && data_type_satisfied_by(original.data_type(), candidate.data_type())
+}
+
+/// See [`schema_satisfied_by`] for the contract.
+fn data_type_satisfied_by(original: &DataType, candidate: &DataType) -> bool {
+ match (original, candidate) {
+ (DataType::List(original_field), DataType::List(candidate_field)) => {
+ field_satisfied_by(original_field, candidate_field)
+ }
+
+ (DataType::ListView(original_field),
DataType::ListView(candidate_field)) => {
+ field_satisfied_by(original_field, candidate_field)
+ }
+
+ (
+ DataType::FixedSizeList(original_field, original_size),
+ DataType::FixedSizeList(candidate_field, candidate_size),
+ ) => {
+ original_size == candidate_size
+ && field_satisfied_by(original_field, candidate_field)
+ }
+
+ (DataType::LargeList(original_field),
DataType::LargeList(candidate_field)) => {
+ field_satisfied_by(original_field, candidate_field)
+ }
+
+ (
+ DataType::LargeListView(original_field),
+ DataType::LargeListView(candidate_field),
+ ) => field_satisfied_by(original_field, candidate_field),
+
+ (DataType::Struct(original_fields),
DataType::Struct(candidate_fields)) => {
+ fields_satisfied_by(original_fields, candidate_fields)
+ }
+
+ // TODO (DataType::Union(, _), DataType::Union(_, _)) => {}
+ // TODO (DataType::Dictionary(_, _), DataType::Dictionary(_, _)) => {}
+ // TODO (DataType::Map(_, _), DataType::Map(_, _)) => {}
+ // TODO (DataType::RunEndEncoded(_, _), DataType::RunEndEncoded(_, _))
=> {}
+ _ => original == candidate,
+ }
+}
diff --git a/datafusion/sqllogictest/test_files/union.slt
b/datafusion/sqllogictest/test_files/union.slt
index 484743fc16..dfac9c0310 100644
--- a/datafusion/sqllogictest/test_files/union.slt
+++ b/datafusion/sqllogictest/test_files/union.slt
@@ -850,3 +850,13 @@ FROM (
----
NULL false
foo true
+
+query T
+SELECT combined
+FROM (
+ SELECT concat('A', 'B') AS combined UNION ALL
+ SELECT concat('A', 'B') AS combined
+)
+GROUP BY combined
+----
+AB
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]