This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new bd249ba5b9 Add documentation for UNION schema handling. (#17248)
bd249ba5b9 is described below
commit bd249ba5b9d9818ce82e515e8dcb1c83c054e32e
Author: wiedld <[email protected]>
AuthorDate: Fri Aug 22 12:50:50 2025 -0700
Add documentation for UNION schema handling. (#17248)
* refactor: consolidate and document union intsersection of field metadata
* chore: document how schemas are unioned in the analyzer
* chore: add concrete examples, linking to datafusion-examples
* inline and rename function
* complete rename
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/expr/src/expr.rs | 59 ++++++++++++++++++++++
datafusion/expr/src/logical_plan/plan.rs | 37 +++++---------
datafusion/optimizer/src/analyzer/type_coercion.rs | 37 ++++++++++++++
3 files changed, 110 insertions(+), 23 deletions(-)
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index 2324ae79c0..0eef8a00a9 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -642,6 +642,65 @@ impl From<&HashMap<String, String>> for FieldMetadata {
}
}
+/// The metadata used in [`Field::metadata`].
+///
+/// This represents the metadata associated with an Arrow [`Field`]. The
metadata consists of key-value pairs.
+///
+/// # Common Use Cases
+///
+/// Field metadata is commonly used to store:
+/// - Default values for columns when data is missing
+/// - Column descriptions or documentation
+/// - Data lineage information
+/// - Custom application-specific annotations
+/// - Encoding hints or display formatting preferences
+///
+/// # Example: Storing Default Values
+///
+/// A practical example of using field metadata is storing default values for
columns
+/// that may be missing in the physical data but present in the logical schema.
+/// See the [default_column_values.rs] example implementation.
+///
+/// [default_column_values.rs]:
https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/default_column_values.rs
+pub type SchemaFieldMetadata = std::collections::HashMap<String, String>;
+
+/// Intersects multiple metadata instances for UNION operations.
+///
+/// This function implements the intersection strategy used by UNION
operations,
+/// where only metadata keys that exist in ALL inputs with identical values
+/// are preserved in the result.
+///
+/// # Union Metadata Behavior
+///
+/// Union operations require consistent metadata across all branches:
+/// - Only metadata keys present in ALL union branches are kept
+/// - For each kept key, the value must be identical across all branches
+/// - If a key has different values across branches, it is excluded from the
result
+/// - If any input has no metadata, the result will be empty
+///
+/// # Arguments
+///
+/// * `metadatas` - An iterator of `SchemaFieldMetadata` instances to intersect
+///
+/// # Returns
+///
+/// A new `SchemaFieldMetadata` containing only the intersected metadata
+pub fn intersect_metadata_for_union<'a>(
+ metadatas: impl IntoIterator<Item = &'a SchemaFieldMetadata>,
+) -> SchemaFieldMetadata {
+ let mut metadatas = metadatas.into_iter();
+ let Some(mut intersected) = metadatas.next().cloned() else {
+ return Default::default();
+ };
+
+ for metadata in metadatas {
+ // Only keep keys that exist in both with the same value
+ intersected.retain(|k, v| metadata.get(k) == Some(v));
+ }
+
+ intersected
+}
+
/// UNNEST expression.
#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub struct Unnest {
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index 8df6c22db6..887afd7cde 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -31,7 +31,10 @@ use super::invariants::{
};
use super::DdlStatement;
use crate::builder::{change_redundant_column, unnest_with_options};
-use crate::expr::{Placeholder, Sort as SortExpr, WindowFunction,
WindowFunctionParams};
+use crate::expr::{
+ intersect_metadata_for_union, Placeholder, Sort as SortExpr,
WindowFunction,
+ WindowFunctionParams,
+};
use crate::expr_rewriter::{
create_col_from_scalar_expr, normalize_cols, normalize_sorts,
NamePreserver,
};
@@ -2799,15 +2802,16 @@ impl Union {
let mut field =
Field::new(name, data_type.clone(), final_is_nullable);
- field.set_metadata(intersect_maps(unmerged_metadata));
+
field.set_metadata(intersect_metadata_for_union(unmerged_metadata));
(None, Arc::new(field))
},
)
.collect::<Vec<(Option<TableReference>, _)>>();
- let union_schema_metadata =
- intersect_maps(inputs.iter().map(|input|
input.schema().metadata()));
+ let union_schema_metadata = intersect_metadata_for_union(
+ inputs.iter().map(|input| input.schema().metadata()),
+ );
// Functional Dependencies are not preserved after UNION operation
let schema = DFSchema::new_with_metadata(union_fields,
union_schema_metadata)?;
@@ -2876,14 +2880,16 @@ impl Union {
};
let mut field = Field::new(&name, data_type.clone(), nullable);
- let field_metadata =
- intersect_maps(fields.iter().map(|field|
field.metadata()));
+ let field_metadata = intersect_metadata_for_union(
+ fields.iter().map(|field| field.metadata()),
+ );
field.set_metadata(field_metadata);
Ok((None, Arc::new(field)))
})
.collect::<Result<_>>()?;
- let union_schema_metadata =
- intersect_maps(inputs.iter().map(|input|
input.schema().metadata()));
+ let union_schema_metadata = intersect_metadata_for_union(
+ inputs.iter().map(|input| input.schema().metadata()),
+ );
// Functional Dependencies are not preserved after UNION operation
let schema = DFSchema::new_with_metadata(union_fields,
union_schema_metadata)?;
@@ -2893,21 +2899,6 @@ impl Union {
}
}
-fn intersect_maps<'a>(
- inputs: impl IntoIterator<Item = &'a HashMap<String, String>>,
-) -> HashMap<String, String> {
- let mut inputs = inputs.into_iter();
- let mut merged: HashMap<String, String> =
inputs.next().cloned().unwrap_or_default();
- for input in inputs {
- // The extra dereference below (`&*v`) is a workaround for
https://github.com/rkyv/rkyv/issues/434.
- // When this crate is used in a workspace that enables the `rkyv-64`
feature in the `chrono` crate,
- // this triggers a Rust compilation error:
- // error[E0277]: can't compare `Option<&std::string::String>` with
`Option<&mut std::string::String>`.
- merged.retain(|k, v| input.get(k) == Some(&*v));
- }
- merged
-}
-
// Manual implementation needed because of `schema` field. Comparison excludes
this field.
impl PartialOrd for Union {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index 7b4726c309..7d4920a6cb 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -957,6 +957,43 @@ fn coerce_case_expression(case: Case, schema: &DFSchema)
-> Result<Case> {
///
/// This method presumes that the wildcard expansion is unneeded, or has
already
/// been applied.
+///
+/// ## Schema and Field Handling in Union Coercion
+///
+/// **Processing order**: The function starts with the base schema (first
input) and then
+/// processes remaining inputs sequentially, with later inputs taking
precedence in merging.
+///
+/// **Schema-level metadata merging**: Later schemas take precedence for
duplicate keys.
+///
+/// **Field-level metadata merging**: Later fields take precedence for
duplicate metadata keys.
+///
+/// **Type coercion precedence**: The coerced type is determined by
iteratively applying
+/// `comparison_coercion()` between the accumulated type and each new input's
type. The
+/// result depends on type coercion rules, not input order.
+///
+/// **Nullability merging**: Nullability is accumulated using logical OR
(`||`).
+/// Once any input field is nullable, the result field becomes nullable
permanently.
+/// Later inputs can make a field nullable but cannot make it non-nullable.
+///
+/// **Field precedence**: Field names come from the first (base) schema, but
the field properties
+/// (nullability and field-level metadata) have later schemas taking
precedence.
+///
+/// **Example**:
+/// ```sql
+/// SELECT a, b FROM table1 -- a: Int32, metadata {"source": "t1"},
nullable=false
+/// UNION
+/// SELECT a, b FROM table2 -- a: Int64, metadata {"source": "t2"},
nullable=true
+/// UNION
+/// SELECT a, b FROM table3 -- a: Int32, metadata {"encoding": "utf8"},
nullable=false
+/// -- Result:
+/// -- a: Int64 (from type coercion), nullable=true (from table2),
+/// -- metadata: {"source": "t2", "encoding": "utf8"} (later inputs take
precedence)
+/// ```
+///
+/// **Precedence Summary**:
+/// - **Datatypes**: Determined by `comparison_coercion()` rules, not input
order
+/// - **Nullability**: Later inputs can add nullability but cannot remove it
(logical OR)
+/// - **Metadata**: Later inputs take precedence for same keys
(HashMap::extend semantics)
pub fn coerce_union_schema(inputs: &[Arc<LogicalPlan>]) -> Result<DFSchema> {
coerce_union_schema_with_schema(&inputs[1..], inputs[0].schema())
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]