This is an automated email from the ASF dual-hosted git repository.
ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new a2ac00da1b Improve Union Equivalence Propagation (#11506)
a2ac00da1b is described below
commit a2ac00da1b3aa7879317ae88d1b356b27f49f887
Author: Mustafa Akur <[email protected]>
AuthorDate: Mon Jul 22 23:51:43 2024 +0300
Improve Union Equivalence Propagation (#11506)
* Initial commit
* Fix formatting
* Minor changes
* Fix failing test
* Change union calculation algorithm to make it symmetric
* Minor changes
* Add unit tests
* Simplifications
* Review Part 1
* Move test and union equivalence
* Add new tests
* Support for union with different schema
* Address reviews
* Review Part 2
* Add new tests
* Final Review
---------
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
.../physical-expr-common/src/physical_expr.rs | 33 +-
datafusion/physical-expr/src/equivalence/mod.rs | 4 +-
.../physical-expr/src/equivalence/properties.rs | 641 ++++++++++++++++++---
datafusion/physical-expr/src/lib.rs | 2 +-
datafusion/physical-plan/src/common.rs | 356 +-----------
datafusion/physical-plan/src/union.rs | 115 ++--
datafusion/sqllogictest/test_files/order.slt | 7 +
7 files changed, 647 insertions(+), 511 deletions(-)
diff --git a/datafusion/physical-expr-common/src/physical_expr.rs
b/datafusion/physical-expr-common/src/physical_expr.rs
index 1998f14396..c74fb9c2d1 100644
--- a/datafusion/physical-expr-common/src/physical_expr.rs
+++ b/datafusion/physical-expr-common/src/physical_expr.rs
@@ -20,13 +20,15 @@ use std::fmt::{Debug, Display};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
+use crate::expressions::column::Column;
use crate::utils::scatter;
use arrow::array::BooleanArray;
use arrow::compute::filter_record_batch;
-use arrow::datatypes::{DataType, Schema};
+use arrow::datatypes::{DataType, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
-use datafusion_common::{internal_err, not_impl_err, Result};
+use datafusion_common::tree_node::{Transformed, TreeNode};
+use datafusion_common::{internal_err, not_impl_err, plan_err, Result};
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::sort_properties::ExprProperties;
use datafusion_expr::ColumnarValue;
@@ -191,6 +193,33 @@ pub fn with_new_children_if_necessary(
}
}
+/// Rewrites an expression according to new schema; i.e. changes the columns it
+/// refers to with the column at corresponding index in the new schema. Returns
+/// an error if the given schema has fewer columns than the original schema.
+/// Note that the resulting expression may not be valid if data types in the
+/// new schema is incompatible with expression nodes.
+pub fn with_new_schema(
+ expr: Arc<dyn PhysicalExpr>,
+ schema: &SchemaRef,
+) -> Result<Arc<dyn PhysicalExpr>> {
+ Ok(expr
+ .transform_up(|expr| {
+ if let Some(col) = expr.as_any().downcast_ref::<Column>() {
+ let idx = col.index();
+ let Some(field) = schema.fields().get(idx) else {
+ return plan_err!(
+ "New schema has fewer columns than original schema"
+ );
+ };
+ let new_col = Column::new(field.name(), idx);
+ Ok(Transformed::yes(Arc::new(new_col) as _))
+ } else {
+ Ok(Transformed::no(expr))
+ }
+ })?
+ .data)
+}
+
pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
if any.is::<Arc<dyn PhysicalExpr>>() {
any.downcast_ref::<Arc<dyn PhysicalExpr>>()
diff --git a/datafusion/physical-expr/src/equivalence/mod.rs
b/datafusion/physical-expr/src/equivalence/mod.rs
index 83f94057f7..b9228282b0 100644
--- a/datafusion/physical-expr/src/equivalence/mod.rs
+++ b/datafusion/physical-expr/src/equivalence/mod.rs
@@ -30,7 +30,9 @@ mod properties;
pub use class::{ConstExpr, EquivalenceClass, EquivalenceGroup};
pub use ordering::OrderingEquivalenceClass;
pub use projection::ProjectionMapping;
-pub use properties::{join_equivalence_properties, EquivalenceProperties};
+pub use properties::{
+ calculate_union, join_equivalence_properties, EquivalenceProperties,
+};
/// This function constructs a duplicate-free `LexOrderingReq` by filtering out
/// duplicate entries that have same physical expression inside. For example,
diff --git a/datafusion/physical-expr/src/equivalence/properties.rs
b/datafusion/physical-expr/src/equivalence/properties.rs
index 8c327fbaf4..64c22064d4 100644
--- a/datafusion/physical-expr/src/equivalence/properties.rs
+++ b/datafusion/physical-expr/src/equivalence/properties.rs
@@ -21,7 +21,8 @@ use std::sync::Arc;
use super::ordering::collapse_lex_ordering;
use crate::equivalence::class::const_exprs_contains;
use crate::equivalence::{
- collapse_lex_req, EquivalenceGroup, OrderingEquivalenceClass,
ProjectionMapping,
+ collapse_lex_req, EquivalenceClass, EquivalenceGroup,
OrderingEquivalenceClass,
+ ProjectionMapping,
};
use crate::expressions::Literal;
use crate::{
@@ -32,11 +33,12 @@ use crate::{
use arrow_schema::{SchemaRef, SortOptions};
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
-use datafusion_common::{JoinSide, JoinType, Result};
+use datafusion_common::{plan_err, JoinSide, JoinType, Result};
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
use datafusion_physical_expr_common::expressions::column::Column;
use datafusion_physical_expr_common::expressions::CastExpr;
+use datafusion_physical_expr_common::physical_expr::with_new_schema;
use datafusion_physical_expr_common::utils::ExprPropertiesNode;
use indexmap::{IndexMap, IndexSet};
@@ -536,33 +538,6 @@ impl EquivalenceProperties {
.then_some(if lhs.len() >= rhs.len() { lhs } else { rhs })
}
- /// Calculates the "meet" of the given orderings (`lhs` and `rhs`).
- /// The meet of a set of orderings is the finest ordering that is satisfied
- /// by all the orderings in that set. For details, see:
- ///
- /// <https://en.wikipedia.org/wiki/Join_and_meet>
- ///
- /// If there is no ordering that satisfies both `lhs` and `rhs`, returns
- /// `None`. As an example, the meet of orderings `[a ASC]` and `[a ASC, b
ASC]`
- /// is `[a ASC]`.
- pub fn get_meet_ordering(
- &self,
- lhs: LexOrderingRef,
- rhs: LexOrderingRef,
- ) -> Option<LexOrdering> {
- let lhs = self.normalize_sort_exprs(lhs);
- let rhs = self.normalize_sort_exprs(rhs);
- let mut meet = vec![];
- for (lhs, rhs) in lhs.into_iter().zip(rhs.into_iter()) {
- if lhs.eq(&rhs) {
- meet.push(lhs);
- } else {
- break;
- }
- }
- (!meet.is_empty()).then_some(meet)
- }
-
/// we substitute the ordering according to input expression type, this is
a simplified version
/// In this case, we just substitute when the expression satisfy the
following condition:
/// I. just have one column and is a CAST expression
@@ -1007,6 +982,74 @@ impl EquivalenceProperties {
.map(|node| node.data)
.unwrap_or(ExprProperties::new_unknown())
}
+
+ /// Transforms this `EquivalenceProperties` into a new
`EquivalenceProperties`
+ /// by mapping columns in the original schema to columns in the new schema
+ /// by index.
+ pub fn with_new_schema(self, schema: SchemaRef) -> Result<Self> {
+ // The new schema and the original schema is aligned when they have the
+ // same number of columns, and fields at the same index have the same
+ // type in both schemas.
+ let schemas_aligned = (self.schema.fields.len() == schema.fields.len())
+ && self
+ .schema
+ .fields
+ .iter()
+ .zip(schema.fields.iter())
+ .all(|(lhs, rhs)| lhs.data_type().eq(rhs.data_type()));
+ if !schemas_aligned {
+ // Rewriting equivalence properties in terms of new schema is not
+ // safe when schemas are not aligned:
+ return plan_err!(
+ "Cannot rewrite old_schema:{:?} with new schema: {:?}",
+ self.schema,
+ schema
+ );
+ }
+ // Rewrite constants according to new schema:
+ let new_constants = self
+ .constants
+ .into_iter()
+ .map(|const_expr| {
+ let across_partitions = const_expr.across_partitions();
+ let new_const_expr = with_new_schema(const_expr.owned_expr(),
&schema)?;
+ Ok(ConstExpr::new(new_const_expr)
+ .with_across_partitions(across_partitions))
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ // Rewrite orderings according to new schema:
+ let mut new_orderings = vec![];
+ for ordering in self.oeq_class.orderings {
+ let new_ordering = ordering
+ .into_iter()
+ .map(|mut sort_expr| {
+ sort_expr.expr = with_new_schema(sort_expr.expr, &schema)?;
+ Ok(sort_expr)
+ })
+ .collect::<Result<_>>()?;
+ new_orderings.push(new_ordering);
+ }
+
+ // Rewrite equivalence classes according to the new schema:
+ let mut eq_classes = vec![];
+ for eq_class in self.eq_group.classes {
+ let new_eq_exprs = eq_class
+ .into_vec()
+ .into_iter()
+ .map(|expr| with_new_schema(expr, &schema))
+ .collect::<Result<_>>()?;
+ eq_classes.push(EquivalenceClass::new(new_eq_exprs));
+ }
+
+ // Construct the resulting equivalence properties:
+ let mut result = EquivalenceProperties::new(schema);
+ result.constants = new_constants;
+ result.add_new_orderings(new_orderings);
+ result.add_equivalence_group(EquivalenceGroup::new(eq_classes));
+
+ Ok(result)
+ }
}
/// Calculates the properties of a given [`ExprPropertiesNode`].
@@ -1484,6 +1527,84 @@ impl Hash for ExprWrapper {
}
}
+/// Calculates the union (in the sense of `UnionExec`) `EquivalenceProperties`
+/// of `lhs` and `rhs` according to the schema of `lhs`.
+fn calculate_union_binary(
+ lhs: EquivalenceProperties,
+ mut rhs: EquivalenceProperties,
+) -> Result<EquivalenceProperties> {
+ // TODO: In some cases, we should be able to preserve some equivalence
+ // classes. Add support for such cases.
+
+ // Harmonize the schema of the rhs with the schema of the lhs (which is
the accumulator schema):
+ if !rhs.schema.eq(&lhs.schema) {
+ rhs = rhs.with_new_schema(Arc::clone(&lhs.schema))?;
+ }
+
+ // First, calculate valid constants for the union. A quantity is constant
+ // after the union if it is constant in both sides.
+ let constants = lhs
+ .constants()
+ .iter()
+ .filter(|const_expr| const_exprs_contains(rhs.constants(),
const_expr.expr()))
+ .map(|const_expr| {
+ // TODO: When both sides' constants are valid across partitions,
+ // the union's constant should also be valid if values are
+ // the same. However, we do not have the capability to
+ // check this yet.
+
ConstExpr::new(Arc::clone(const_expr.expr())).with_across_partitions(false)
+ })
+ .collect();
+
+ // Next, calculate valid orderings for the union by searching for prefixes
+ // in both sides.
+ let mut orderings = vec![];
+ for mut ordering in lhs.normalized_oeq_class().orderings {
+ // Progressively shorten the ordering to search for a satisfied prefix:
+ while !rhs.ordering_satisfy(&ordering) {
+ ordering.pop();
+ }
+ // There is a non-trivial satisfied prefix, add it as a valid ordering:
+ if !ordering.is_empty() {
+ orderings.push(ordering);
+ }
+ }
+ for mut ordering in rhs.normalized_oeq_class().orderings {
+ // Progressively shorten the ordering to search for a satisfied prefix:
+ while !lhs.ordering_satisfy(&ordering) {
+ ordering.pop();
+ }
+ // There is a non-trivial satisfied prefix, add it as a valid ordering:
+ if !ordering.is_empty() {
+ orderings.push(ordering);
+ }
+ }
+ let mut eq_properties = EquivalenceProperties::new(lhs.schema);
+ eq_properties.constants = constants;
+ eq_properties.add_new_orderings(orderings);
+ Ok(eq_properties)
+}
+
+/// Calculates the union (in the sense of `UnionExec`) `EquivalenceProperties`
+/// of the given `EquivalenceProperties` in `eqps` according to the given
+/// output `schema` (which need not be the same with those of `lhs` and `rhs`
+/// as details such as nullability may be different).
+pub fn calculate_union(
+ eqps: Vec<EquivalenceProperties>,
+ schema: SchemaRef,
+) -> Result<EquivalenceProperties> {
+ // TODO: In some cases, we should be able to preserve some equivalence
+ // classes. Add support for such cases.
+ let mut init = eqps[0].clone();
+ // Harmonize the schema of the init with the schema of the union:
+ if !init.schema.eq(&schema) {
+ init = init.with_new_schema(schema)?;
+ }
+ eqps.into_iter()
+ .skip(1)
+ .try_fold(init, calculate_union_binary)
+}
+
#[cfg(test)]
mod tests {
use std::ops::Not;
@@ -2188,50 +2309,6 @@ mod tests {
Ok(())
}
- #[test]
- fn test_get_meet_ordering() -> Result<()> {
- let schema = create_test_schema()?;
- let col_a = &col("a", &schema)?;
- let col_b = &col("b", &schema)?;
- let eq_properties = EquivalenceProperties::new(schema);
- let option_asc = SortOptions {
- descending: false,
- nulls_first: false,
- };
- let option_desc = SortOptions {
- descending: true,
- nulls_first: true,
- };
- let tests_cases = vec![
- // Get meet ordering between [a ASC] and [a ASC, b ASC]
- // result should be [a ASC]
- (
- vec![(col_a, option_asc)],
- vec![(col_a, option_asc), (col_b, option_asc)],
- Some(vec![(col_a, option_asc)]),
- ),
- // Get meet ordering between [a ASC] and [a DESC]
- // result should be None.
- (vec![(col_a, option_asc)], vec![(col_a, option_desc)], None),
- // Get meet ordering between [a ASC, b ASC] and [a ASC, b DESC]
- // result should be [a ASC].
- (
- vec![(col_a, option_asc), (col_b, option_asc)],
- vec![(col_a, option_asc), (col_b, option_desc)],
- Some(vec![(col_a, option_asc)]),
- ),
- ];
- for (lhs, rhs, expected) in tests_cases {
- let lhs = convert_to_sort_exprs(&lhs);
- let rhs = convert_to_sort_exprs(&rhs);
- let expected = expected.map(|expected|
convert_to_sort_exprs(&expected));
- let finer = eq_properties.get_meet_ordering(&lhs, &rhs);
- assert_eq!(finer, expected)
- }
-
- Ok(())
- }
-
#[test]
fn test_get_finer() -> Result<()> {
let schema = create_test_schema()?;
@@ -2525,4 +2602,422 @@ mod tests {
Ok(())
}
+
+ fn append_fields(schema: &SchemaRef, text: &str) -> SchemaRef {
+ Arc::new(Schema::new(
+ schema
+ .fields()
+ .iter()
+ .map(|field| {
+ Field::new(
+ // Annotate name with `text`:
+ format!("{}{}", field.name(), text),
+ field.data_type().clone(),
+ field.is_nullable(),
+ )
+ })
+ .collect::<Vec<_>>(),
+ ))
+ }
+
+ #[tokio::test]
+ async fn test_union_equivalence_properties_multi_children() -> Result<()> {
+ let schema = create_test_schema()?;
+ let schema2 = append_fields(&schema, "1");
+ let schema3 = append_fields(&schema, "2");
+ let test_cases = vec![
+ // --------- TEST CASE 1 ----------
+ (
+ vec![
+ // Children 1
+ (
+ // Orderings
+ vec![vec!["a", "b", "c"]],
+ Arc::clone(&schema),
+ ),
+ // Children 2
+ (
+ // Orderings
+ vec![vec!["a1", "b1", "c1"]],
+ Arc::clone(&schema2),
+ ),
+ // Children 3
+ (
+ // Orderings
+ vec![vec!["a2", "b2"]],
+ Arc::clone(&schema3),
+ ),
+ ],
+ // Expected
+ vec![vec!["a", "b"]],
+ ),
+ // --------- TEST CASE 2 ----------
+ (
+ vec![
+ // Children 1
+ (
+ // Orderings
+ vec![vec!["a", "b", "c"]],
+ Arc::clone(&schema),
+ ),
+ // Children 2
+ (
+ // Orderings
+ vec![vec!["a1", "b1", "c1"]],
+ Arc::clone(&schema2),
+ ),
+ // Children 3
+ (
+ // Orderings
+ vec![vec!["a2", "b2", "c2"]],
+ Arc::clone(&schema3),
+ ),
+ ],
+ // Expected
+ vec![vec!["a", "b", "c"]],
+ ),
+ // --------- TEST CASE 3 ----------
+ (
+ vec![
+ // Children 1
+ (
+ // Orderings
+ vec![vec!["a", "b"]],
+ Arc::clone(&schema),
+ ),
+ // Children 2
+ (
+ // Orderings
+ vec![vec!["a1", "b1", "c1"]],
+ Arc::clone(&schema2),
+ ),
+ // Children 3
+ (
+ // Orderings
+ vec![vec!["a2", "b2", "c2"]],
+ Arc::clone(&schema3),
+ ),
+ ],
+ // Expected
+ vec![vec!["a", "b"]],
+ ),
+ // --------- TEST CASE 4 ----------
+ (
+ vec![
+ // Children 1
+ (
+ // Orderings
+ vec![vec!["a", "b"]],
+ Arc::clone(&schema),
+ ),
+ // Children 2
+ (
+ // Orderings
+ vec![vec!["a1", "b1"]],
+ Arc::clone(&schema2),
+ ),
+ // Children 3
+ (
+ // Orderings
+ vec![vec!["b2", "c2"]],
+ Arc::clone(&schema3),
+ ),
+ ],
+ // Expected
+ vec![],
+ ),
+ // --------- TEST CASE 5 ----------
+ (
+ vec![
+ // Children 1
+ (
+ // Orderings
+ vec![vec!["a", "b"], vec!["c"]],
+ Arc::clone(&schema),
+ ),
+ // Children 2
+ (
+ // Orderings
+ vec![vec!["a1", "b1"], vec!["c1"]],
+ Arc::clone(&schema2),
+ ),
+ ],
+ // Expected
+ vec![vec!["a", "b"], vec!["c"]],
+ ),
+ ];
+ for (children, expected) in test_cases {
+ let children_eqs = children
+ .iter()
+ .map(|(orderings, schema)| {
+ let orderings = orderings
+ .iter()
+ .map(|ordering| {
+ ordering
+ .iter()
+ .map(|name| PhysicalSortExpr {
+ expr: col(name, schema).unwrap(),
+ options: SortOptions::default(),
+ })
+ .collect::<Vec<_>>()
+ })
+ .collect::<Vec<_>>();
+ EquivalenceProperties::new_with_orderings(
+ Arc::clone(schema),
+ &orderings,
+ )
+ })
+ .collect::<Vec<_>>();
+ let actual = calculate_union(children_eqs, Arc::clone(&schema))?;
+
+ let expected_ordering = expected
+ .into_iter()
+ .map(|ordering| {
+ ordering
+ .into_iter()
+ .map(|name| PhysicalSortExpr {
+ expr: col(name, &schema).unwrap(),
+ options: SortOptions::default(),
+ })
+ .collect::<Vec<_>>()
+ })
+ .collect::<Vec<_>>();
+ let expected = EquivalenceProperties::new_with_orderings(
+ Arc::clone(&schema),
+ &expected_ordering,
+ );
+ assert_eq_properties_same(
+ &actual,
+ &expected,
+ format!("expected: {:?}, actual: {:?}", expected, actual),
+ );
+ }
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_union_equivalence_properties_binary() -> Result<()> {
+ let schema = create_test_schema()?;
+ let schema2 = append_fields(&schema, "1");
+ let col_a = &col("a", &schema)?;
+ let col_b = &col("b", &schema)?;
+ let col_c = &col("c", &schema)?;
+ let col_a1 = &col("a1", &schema2)?;
+ let col_b1 = &col("b1", &schema2)?;
+ let options = SortOptions::default();
+ let options_desc = !SortOptions::default();
+ let test_cases = [
+ //-----------TEST CASE 1----------//
+ (
+ (
+ // First child orderings
+ vec![
+ // [a ASC]
+ (vec![(col_a, options)]),
+ ],
+ // First child constants
+ vec![col_b, col_c],
+ Arc::clone(&schema),
+ ),
+ (
+ // Second child orderings
+ vec![
+ // [b ASC]
+ (vec![(col_b, options)]),
+ ],
+ // Second child constants
+ vec![col_a, col_c],
+ Arc::clone(&schema),
+ ),
+ (
+ // Union expected orderings
+ vec![
+ // [a ASC]
+ vec![(col_a, options)],
+ // [b ASC]
+ vec![(col_b, options)],
+ ],
+ // Union
+ vec![col_c],
+ ),
+ ),
+ //-----------TEST CASE 2----------//
+ // Meet ordering between [a ASC], [a ASC, b ASC] should be [a ASC]
+ (
+ (
+ // First child orderings
+ vec![
+ // [a ASC]
+ vec![(col_a, options)],
+ ],
+ // No constant
+ vec![],
+ Arc::clone(&schema),
+ ),
+ (
+ // Second child orderings
+ vec![
+ // [a ASC, b ASC]
+ vec![(col_a, options), (col_b, options)],
+ ],
+ // No constant
+ vec![],
+ Arc::clone(&schema),
+ ),
+ (
+ // Union orderings
+ vec![
+ // [a ASC]
+ vec![(col_a, options)],
+ ],
+ // No constant
+ vec![],
+ ),
+ ),
+ //-----------TEST CASE 3----------//
+ // Meet ordering between [a ASC], [a DESC] should be []
+ (
+ (
+ // First child orderings
+ vec![
+ // [a ASC]
+ vec![(col_a, options)],
+ ],
+ // No constant
+ vec![],
+ Arc::clone(&schema),
+ ),
+ (
+ // Second child orderings
+ vec![
+ // [a DESC]
+ vec![(col_a, options_desc)],
+ ],
+ // No constant
+ vec![],
+ Arc::clone(&schema),
+ ),
+ (
+ // Union doesn't have any ordering
+ vec![],
+ // No constant
+ vec![],
+ ),
+ ),
+ //-----------TEST CASE 4----------//
+ // Meet ordering between [a ASC], [a1 ASC, b1 ASC] should be [a
ASC]
+ // Where a, and a1 ath the same index for their corresponding
schemas.
+ (
+ (
+ // First child orderings
+ vec![
+ // [a ASC]
+ vec![(col_a, options)],
+ ],
+ // No constant
+ vec![],
+ Arc::clone(&schema),
+ ),
+ (
+ // Second child orderings
+ vec![
+ // [a1 ASC, b1 ASC]
+ vec![(col_a1, options), (col_b1, options)],
+ ],
+ // No constant
+ vec![],
+ Arc::clone(&schema2),
+ ),
+ (
+ // Union orderings
+ vec![
+ // [a ASC]
+ vec![(col_a, options)],
+ ],
+ // No constant
+ vec![],
+ ),
+ ),
+ ];
+
+ for (
+ test_idx,
+ (
+ (first_child_orderings, first_child_constants, first_schema),
+ (second_child_orderings, second_child_constants,
second_schema),
+ (union_orderings, union_constants),
+ ),
+ ) in test_cases.iter().enumerate()
+ {
+ let first_orderings = first_child_orderings
+ .iter()
+ .map(|ordering| convert_to_sort_exprs(ordering))
+ .collect::<Vec<_>>();
+ let first_constants = first_child_constants
+ .iter()
+ .map(|expr| ConstExpr::new(Arc::clone(expr)))
+ .collect::<Vec<_>>();
+ let mut lhs = EquivalenceProperties::new(Arc::clone(first_schema));
+ lhs = lhs.add_constants(first_constants);
+ lhs.add_new_orderings(first_orderings);
+
+ let second_orderings = second_child_orderings
+ .iter()
+ .map(|ordering| convert_to_sort_exprs(ordering))
+ .collect::<Vec<_>>();
+ let second_constants = second_child_constants
+ .iter()
+ .map(|expr| ConstExpr::new(Arc::clone(expr)))
+ .collect::<Vec<_>>();
+ let mut rhs =
EquivalenceProperties::new(Arc::clone(second_schema));
+ rhs = rhs.add_constants(second_constants);
+ rhs.add_new_orderings(second_orderings);
+
+ let union_expected_orderings = union_orderings
+ .iter()
+ .map(|ordering| convert_to_sort_exprs(ordering))
+ .collect::<Vec<_>>();
+ let union_constants = union_constants
+ .iter()
+ .map(|expr| ConstExpr::new(Arc::clone(expr)))
+ .collect::<Vec<_>>();
+ let mut union_expected_eq =
EquivalenceProperties::new(Arc::clone(&schema));
+ union_expected_eq =
union_expected_eq.add_constants(union_constants);
+ union_expected_eq.add_new_orderings(union_expected_orderings);
+
+ let actual_union_eq = calculate_union_binary(lhs, rhs)?;
+ let err_msg = format!(
+ "Error in test id: {:?}, test case: {:?}",
+ test_idx, test_cases[test_idx]
+ );
+ assert_eq_properties_same(&actual_union_eq, &union_expected_eq,
err_msg);
+ }
+ Ok(())
+ }
+
+ fn assert_eq_properties_same(
+ lhs: &EquivalenceProperties,
+ rhs: &EquivalenceProperties,
+ err_msg: String,
+ ) {
+ // Check whether constants are same
+ let lhs_constants = lhs.constants();
+ let rhs_constants = rhs.constants();
+ assert_eq!(lhs_constants.len(), rhs_constants.len(), "{}", err_msg);
+ for rhs_constant in rhs_constants {
+ assert!(
+ const_exprs_contains(lhs_constants, rhs_constant.expr()),
+ "{}",
+ err_msg
+ );
+ }
+
+ // Check whether orderings are same.
+ let lhs_orderings = lhs.oeq_class();
+ let rhs_orderings = &rhs.oeq_class.orderings;
+ assert_eq!(lhs_orderings.len(), rhs_orderings.len(), "{}", err_msg);
+ for rhs_ordering in rhs_orderings {
+ assert!(lhs_orderings.contains(rhs_ordering), "{}", err_msg);
+ }
+ }
}
diff --git a/datafusion/physical-expr/src/lib.rs
b/datafusion/physical-expr/src/lib.rs
index 4f83ae0195..2e78119eba 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -48,7 +48,7 @@ pub use analysis::{analyze, AnalysisContext, ExprBoundaries};
pub use datafusion_physical_expr_common::aggregate::{
AggregateExpr, AggregatePhysicalExpressions,
};
-pub use equivalence::{ConstExpr, EquivalenceProperties};
+pub use equivalence::{calculate_union, ConstExpr, EquivalenceProperties};
pub use partitioning::{Distribution, Partitioning};
pub use physical_expr::{
physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal,
diff --git a/datafusion/physical-plan/src/common.rs
b/datafusion/physical-plan/src/common.rs
index bf9d14e73d..4b5eea6b76 100644
--- a/datafusion/physical-plan/src/common.rs
+++ b/datafusion/physical-plan/src/common.rs
@@ -22,9 +22,9 @@ use std::fs::{metadata, File};
use std::path::{Path, PathBuf};
use std::sync::Arc;
-use super::{ExecutionPlanProperties, SendableRecordBatchStream};
+use super::SendableRecordBatchStream;
use crate::stream::RecordBatchReceiverStream;
-use crate::{ColumnStatistics, ExecutionPlan, Statistics};
+use crate::{ColumnStatistics, Statistics};
use arrow::datatypes::Schema;
use arrow::ipc::writer::{FileWriter, IpcWriteOptions};
@@ -33,8 +33,6 @@ use arrow_array::Array;
use datafusion_common::stats::Precision;
use datafusion_common::{plan_err, DataFusionError, Result};
use datafusion_execution::memory_pool::MemoryReservation;
-use datafusion_physical_expr::expressions::{BinaryExpr, Column};
-use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use futures::{StreamExt, TryStreamExt};
use parking_lot::Mutex;
@@ -178,71 +176,6 @@ pub fn compute_record_batch_statistics(
}
}
-/// Calculates the "meet" of given orderings.
-/// The meet is the finest ordering that satisfied by all the given
-/// orderings, see <https://en.wikipedia.org/wiki/Join_and_meet>.
-pub fn get_meet_of_orderings(
- given: &[Arc<dyn ExecutionPlan>],
-) -> Option<&[PhysicalSortExpr]> {
- given
- .iter()
- .map(|item| item.output_ordering())
- .collect::<Option<Vec<_>>>()
- .and_then(get_meet_of_orderings_helper)
-}
-
-fn get_meet_of_orderings_helper(
- orderings: Vec<&[PhysicalSortExpr]>,
-) -> Option<&[PhysicalSortExpr]> {
- let mut idx = 0;
- let first = orderings[0];
- loop {
- for ordering in orderings.iter() {
- if idx >= ordering.len() {
- return Some(ordering);
- } else {
- let schema_aligned = check_expr_alignment(
- ordering[idx].expr.as_ref(),
- first[idx].expr.as_ref(),
- );
- if !schema_aligned || (ordering[idx].options !=
first[idx].options) {
- // In a union, the output schema is that of the first
child (by convention).
- // Therefore, generate the result from the first child's
schema:
- return if idx > 0 { Some(&first[..idx]) } else { None };
- }
- }
- }
- idx += 1;
- }
-
- fn check_expr_alignment(first: &dyn PhysicalExpr, second: &dyn
PhysicalExpr) -> bool {
- match (
- first.as_any().downcast_ref::<Column>(),
- second.as_any().downcast_ref::<Column>(),
- first.as_any().downcast_ref::<BinaryExpr>(),
- second.as_any().downcast_ref::<BinaryExpr>(),
- ) {
- (Some(first_col), Some(second_col), _, _) => {
- first_col.index() == second_col.index()
- }
- (_, _, Some(first_binary), Some(second_binary)) => {
- if first_binary.op() == second_binary.op() {
- check_expr_alignment(
- first_binary.left().as_ref(),
- second_binary.left().as_ref(),
- ) && check_expr_alignment(
- first_binary.right().as_ref(),
- second_binary.right().as_ref(),
- )
- } else {
- false
- }
- }
- (_, _, _, _) => false,
- }
- }
-}
-
/// Write in Arrow IPC format.
pub struct IPCWriter {
/// path
@@ -342,297 +275,12 @@ pub fn can_project(
#[cfg(test)]
mod tests {
- use std::ops::Not;
-
use super::*;
- use crate::memory::MemoryExec;
- use crate::sorts::sort::SortExec;
- use crate::union::UnionExec;
- use arrow::compute::SortOptions;
use arrow::{
array::{Float32Array, Float64Array, UInt64Array},
datatypes::{DataType, Field},
};
- use datafusion_expr::Operator;
- use datafusion_physical_expr::expressions::col;
-
- #[test]
- fn get_meet_of_orderings_helper_common_prefix_test() -> Result<()> {
- let input1: Vec<PhysicalSortExpr> = vec![
- PhysicalSortExpr {
- expr: Arc::new(Column::new("a", 0)),
- options: SortOptions::default(),
- },
- PhysicalSortExpr {
- expr: Arc::new(Column::new("b", 1)),
- options: SortOptions::default(),
- },
- PhysicalSortExpr {
- expr: Arc::new(Column::new("c", 2)),
- options: SortOptions::default(),
- },
- ];
-
- let input2: Vec<PhysicalSortExpr> = vec![
- PhysicalSortExpr {
- expr: Arc::new(Column::new("x", 0)),
- options: SortOptions::default(),
- },
- PhysicalSortExpr {
- expr: Arc::new(Column::new("y", 1)),
- options: SortOptions::default(),
- },
- PhysicalSortExpr {
- expr: Arc::new(Column::new("z", 2)),
- options: SortOptions::default(),
- },
- ];
-
- let input3: Vec<PhysicalSortExpr> = vec![
- PhysicalSortExpr {
- expr: Arc::new(Column::new("d", 0)),
- options: SortOptions::default(),
- },
- PhysicalSortExpr {
- expr: Arc::new(Column::new("e", 1)),
- options: SortOptions::default(),
- },
- PhysicalSortExpr {
- expr: Arc::new(Column::new("f", 2)),
- options: SortOptions::default(),
- },
- ];
-
- let input4: Vec<PhysicalSortExpr> = vec![
- PhysicalSortExpr {
- expr: Arc::new(Column::new("g", 0)),
- options: SortOptions::default(),
- },
- PhysicalSortExpr {
- expr: Arc::new(Column::new("h", 1)),
- options: SortOptions::default(),
- },
- PhysicalSortExpr {
- // Note that index of this column is not 2. Hence this 3rd
entry shouldn't be
- // in the output ordering.
- expr: Arc::new(Column::new("i", 3)),
- options: SortOptions::default(),
- },
- ];
-
- let expected = vec![
- PhysicalSortExpr {
- expr: Arc::new(Column::new("a", 0)),
- options: SortOptions::default(),
- },
- PhysicalSortExpr {
- expr: Arc::new(Column::new("b", 1)),
- options: SortOptions::default(),
- },
- PhysicalSortExpr {
- expr: Arc::new(Column::new("c", 2)),
- options: SortOptions::default(),
- },
- ];
- let result = get_meet_of_orderings_helper(vec![&input1, &input2,
&input3]);
- assert_eq!(result.unwrap(), expected);
-
- let expected = vec![
- PhysicalSortExpr {
- expr: Arc::new(Column::new("a", 0)),
- options: SortOptions::default(),
- },
- PhysicalSortExpr {
- expr: Arc::new(Column::new("b", 1)),
- options: SortOptions::default(),
- },
- ];
- let result = get_meet_of_orderings_helper(vec![&input1, &input2,
&input4]);
- assert_eq!(result.unwrap(), expected);
- Ok(())
- }
-
- #[test]
- fn get_meet_of_orderings_helper_subset_test() -> Result<()> {
- let input1: Vec<PhysicalSortExpr> = vec![
- PhysicalSortExpr {
- expr: Arc::new(Column::new("a", 0)),
- options: SortOptions::default(),
- },
- PhysicalSortExpr {
- expr: Arc::new(Column::new("b", 1)),
- options: SortOptions::default(),
- },
- ];
-
- let input2: Vec<PhysicalSortExpr> = vec![
- PhysicalSortExpr {
- expr: Arc::new(Column::new("c", 0)),
- options: SortOptions::default(),
- },
- PhysicalSortExpr {
- expr: Arc::new(Column::new("d", 1)),
- options: SortOptions::default(),
- },
- PhysicalSortExpr {
- expr: Arc::new(Column::new("e", 2)),
- options: SortOptions::default(),
- },
- ];
-
- let input3: Vec<PhysicalSortExpr> = vec![
- PhysicalSortExpr {
- expr: Arc::new(Column::new("f", 0)),
- options: SortOptions::default(),
- },
- PhysicalSortExpr {
- expr: Arc::new(Column::new("g", 1)),
- options: SortOptions::default(),
- },
- PhysicalSortExpr {
- expr: Arc::new(Column::new("h", 2)),
- options: SortOptions::default(),
- },
- ];
-
- let result = get_meet_of_orderings_helper(vec![&input1, &input2,
&input3]);
- assert_eq!(result.unwrap(), input1);
- Ok(())
- }
-
- #[test]
- fn get_meet_of_orderings_helper_no_overlap_test() -> Result<()> {
- let input1: Vec<PhysicalSortExpr> = vec![
- PhysicalSortExpr {
- expr: Arc::new(Column::new("a", 0)),
- // Since ordering is conflicting with other inputs
- // output ordering should be empty
- options: SortOptions::default().not(),
- },
- PhysicalSortExpr {
- expr: Arc::new(Column::new("b", 1)),
- options: SortOptions::default(),
- },
- ];
-
- let input2: Vec<PhysicalSortExpr> = vec![
- PhysicalSortExpr {
- expr: Arc::new(Column::new("x", 0)),
- options: SortOptions::default(),
- },
- PhysicalSortExpr {
- expr: Arc::new(Column::new("a", 1)),
- options: SortOptions::default(),
- },
- ];
-
- let input3: Vec<PhysicalSortExpr> = vec![
- PhysicalSortExpr {
- expr: Arc::new(Column::new("a", 2)),
- options: SortOptions::default(),
- },
- PhysicalSortExpr {
- expr: Arc::new(Column::new("y", 1)),
- options: SortOptions::default(),
- },
- ];
-
- let result = get_meet_of_orderings_helper(vec![&input1, &input2]);
- assert!(result.is_none());
-
- let result = get_meet_of_orderings_helper(vec![&input2, &input3]);
- assert!(result.is_none());
-
- let result = get_meet_of_orderings_helper(vec![&input1, &input3]);
- assert!(result.is_none());
- Ok(())
- }
-
- #[test]
- fn get_meet_of_orderings_helper_binary_exprs() -> Result<()> {
- let input1: Vec<PhysicalSortExpr> = vec![
- PhysicalSortExpr {
- expr: Arc::new(BinaryExpr::new(
- Arc::new(Column::new("a", 0)),
- Operator::Plus,
- Arc::new(Column::new("b", 1)),
- )),
- options: SortOptions::default(),
- },
- PhysicalSortExpr {
- expr: Arc::new(Column::new("c", 2)),
- options: SortOptions::default(),
- },
- ];
-
- let input2: Vec<PhysicalSortExpr> = vec![
- PhysicalSortExpr {
- expr: Arc::new(BinaryExpr::new(
- Arc::new(Column::new("x", 0)),
- Operator::Plus,
- Arc::new(Column::new("y", 1)),
- )),
- options: SortOptions::default(),
- },
- PhysicalSortExpr {
- expr: Arc::new(Column::new("z", 2)),
- options: SortOptions::default(),
- },
- ];
-
- // erroneous input
- let input3: Vec<PhysicalSortExpr> = vec![
- PhysicalSortExpr {
- expr: Arc::new(BinaryExpr::new(
- Arc::new(Column::new("a", 1)),
- Operator::Plus,
- Arc::new(Column::new("b", 0)),
- )),
- options: SortOptions::default(),
- },
- PhysicalSortExpr {
- expr: Arc::new(Column::new("c", 2)),
- options: SortOptions::default(),
- },
- ];
-
- let result = get_meet_of_orderings_helper(vec![&input1, &input2]);
- assert_eq!(input1, result.unwrap());
-
- let result = get_meet_of_orderings_helper(vec![&input2, &input3]);
- assert!(result.is_none());
-
- let result = get_meet_of_orderings_helper(vec![&input1, &input3]);
- assert!(result.is_none());
- Ok(())
- }
-
- #[test]
- fn test_meet_of_orderings() -> Result<()> {
- let schema = Arc::new(Schema::new(vec![
- Field::new("f32", DataType::Float32, false),
- Field::new("f64", DataType::Float64, false),
- ]));
- let sort_expr = vec![PhysicalSortExpr {
- expr: col("f32", &schema).unwrap(),
- options: SortOptions::default(),
- }];
- let memory_exec =
- Arc::new(MemoryExec::try_new(&[], Arc::clone(&schema), None)?) as
_;
- let sort_exec = Arc::new(SortExec::new(sort_expr.clone(), memory_exec))
- as Arc<dyn ExecutionPlan>;
- let memory_exec2 = Arc::new(MemoryExec::try_new(&[], schema, None)?)
as _;
- // memory_exec2 doesn't have output ordering
- let union_exec = UnionExec::new(vec![Arc::clone(&sort_exec),
memory_exec2]);
- let res = get_meet_of_orderings(union_exec.inputs());
- assert!(res.is_none());
-
- let union_exec = UnionExec::new(vec![Arc::clone(&sort_exec),
sort_exec]);
- let res = get_meet_of_orderings(union_exec.inputs());
- assert_eq!(res, Some(&sort_expr[..]));
- Ok(())
- }
#[test]
fn test_compute_record_batch_statistics_empty() -> Result<()> {
diff --git a/datafusion/physical-plan/src/union.rs
b/datafusion/physical-plan/src/union.rs
index 24c80048ab..9321fdb2ca 100644
--- a/datafusion/physical-plan/src/union.rs
+++ b/datafusion/physical-plan/src/union.rs
@@ -41,7 +41,7 @@ use arrow::record_batch::RecordBatch;
use datafusion_common::stats::Precision;
use datafusion_common::{exec_err, internal_err, Result};
use datafusion_execution::TaskContext;
-use datafusion_physical_expr::{ConstExpr, EquivalenceProperties};
+use datafusion_physical_expr::{calculate_union, EquivalenceProperties};
use futures::Stream;
use itertools::Itertools;
@@ -99,7 +99,12 @@ impl UnionExec {
/// Create a new UnionExec
pub fn new(inputs: Vec<Arc<dyn ExecutionPlan>>) -> Self {
let schema = union_schema(&inputs);
- let cache = Self::compute_properties(&inputs, schema);
+ // The schema of the inputs and the union schema is consistent when:
+ // - They have the same number of fields, and
+ // - Their fields have same types at the same indices.
+ // Here, we know that schemas are consistent and the call below can
+ // not return an error.
+ let cache = Self::compute_properties(&inputs, schema).unwrap();
UnionExec {
inputs,
metrics: ExecutionPlanMetricsSet::new(),
@@ -116,13 +121,13 @@ impl UnionExec {
fn compute_properties(
inputs: &[Arc<dyn ExecutionPlan>],
schema: SchemaRef,
- ) -> PlanProperties {
+ ) -> Result<PlanProperties> {
// Calculate equivalence properties:
- let children_eqs = inputs
+ let children_eqps = inputs
.iter()
- .map(|child| child.equivalence_properties())
+ .map(|child| child.equivalence_properties().clone())
.collect::<Vec<_>>();
- let eq_properties = calculate_union_eq_properties(&children_eqs,
schema);
+ let eq_properties = calculate_union(children_eqps, schema)?;
// Calculate output partitioning; i.e. sum output partitions of the
inputs.
let num_partitions = inputs
@@ -134,71 +139,13 @@ impl UnionExec {
// Determine execution mode:
let mode = execution_mode_from_children(inputs.iter());
- PlanProperties::new(eq_properties, output_partitioning, mode)
+ Ok(PlanProperties::new(
+ eq_properties,
+ output_partitioning,
+ mode,
+ ))
}
}
-/// Calculate `EquivalenceProperties` for `UnionExec` from the
`EquivalenceProperties`
-/// of its children.
-fn calculate_union_eq_properties(
- children_eqs: &[&EquivalenceProperties],
- schema: SchemaRef,
-) -> EquivalenceProperties {
- // Calculate equivalence properties:
- // TODO: In some cases, we should be able to preserve some equivalence
- // classes and constants. Add support for such cases.
- let mut eq_properties = EquivalenceProperties::new(schema);
- // Use the ordering equivalence class of the first child as the seed:
- let mut meets = children_eqs[0]
- .oeq_class()
- .iter()
- .map(|item| item.to_vec())
- .collect::<Vec<_>>();
- // Iterate over all the children:
- for child_eqs in &children_eqs[1..] {
- // Compute meet orderings of the current meets and the new ordering
- // equivalence class.
- let mut idx = 0;
- while idx < meets.len() {
- // Find all the meets of `current_meet` with this child's
orderings:
- let valid_meets =
child_eqs.oeq_class().iter().filter_map(|ordering| {
- child_eqs.get_meet_ordering(ordering, &meets[idx])
- });
- // Use the longest of these meets as others are redundant:
- if let Some(next_meet) = valid_meets.max_by_key(|m| m.len()) {
- meets[idx] = next_meet;
- idx += 1;
- } else {
- meets.swap_remove(idx);
- }
- }
- }
- // We know have all the valid orderings after union, remove redundant
- // entries (implicitly) and return:
- eq_properties.add_new_orderings(meets);
-
- let mut meet_constants = children_eqs[0].constants().to_vec();
- // Iterate over all the children:
- for child_eqs in &children_eqs[1..] {
- let constants = child_eqs.constants();
- meet_constants = meet_constants
- .into_iter()
- .filter_map(|meet_constant| {
- for const_expr in constants {
- if const_expr.expr().eq(meet_constant.expr()) {
- // TODO: Check whether constant expressions evaluates
the same value or not for each partition
- let across_partitions = false;
- return Some(
- ConstExpr::from(meet_constant.owned_expr())
- .with_across_partitions(across_partitions),
- );
- }
- }
- None
- })
- .collect::<Vec<_>>();
- }
- eq_properties.add_constants(meet_constants)
-}
impl DisplayAs for UnionExec {
fn fmt_as(
@@ -639,8 +586,8 @@ mod tests {
use arrow_schema::{DataType, SortOptions};
use datafusion_common::ScalarValue;
- use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
+ use datafusion_physical_expr_common::expressions::column::col;
// Generate a schema which consists of 7 columns (a, b, c, d, e, f, g)
fn create_test_schema() -> Result<SchemaRef> {
@@ -856,23 +803,31 @@ mod tests {
.with_sort_information(second_orderings),
);
+ let mut union_expected_eq =
EquivalenceProperties::new(Arc::clone(&schema));
+ union_expected_eq.add_new_orderings(union_expected_orderings);
+
let union = UnionExec::new(vec![child1, child2]);
let union_eq_properties =
union.properties().equivalence_properties();
- let union_actual_orderings = union_eq_properties.oeq_class();
let err_msg = format!(
"Error in test id: {:?}, test case: {:?}",
test_idx, test_cases[test_idx]
);
- assert_eq!(
- union_actual_orderings.len(),
- union_expected_orderings.len(),
- "{}",
- err_msg
- );
- for expected in &union_expected_orderings {
- assert!(union_actual_orderings.contains(expected), "{}",
err_msg);
- }
+ assert_eq_properties_same(union_eq_properties, &union_expected_eq,
err_msg);
}
Ok(())
}
+
+ fn assert_eq_properties_same(
+ lhs: &EquivalenceProperties,
+ rhs: &EquivalenceProperties,
+ err_msg: String,
+ ) {
+ // Check whether orderings are same.
+ let lhs_orderings = lhs.oeq_class();
+ let rhs_orderings = &rhs.oeq_class.orderings;
+ assert_eq!(lhs_orderings.len(), rhs_orderings.len(), "{}", err_msg);
+ for rhs_ordering in rhs_orderings {
+ assert!(lhs_orderings.contains(rhs_ordering), "{}", err_msg);
+ }
+ }
}
diff --git a/datafusion/sqllogictest/test_files/order.slt
b/datafusion/sqllogictest/test_files/order.slt
index 51de40fb19..1aeaf9b76d 100644
--- a/datafusion/sqllogictest/test_files/order.slt
+++ b/datafusion/sqllogictest/test_files/order.slt
@@ -1132,3 +1132,10 @@ physical_plan
02)--ProjectionExec: expr=[CAST(inc_col@0 > desc_col@1 AS Int32) as c]
03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
04)------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]},
projection=[inc_col, desc_col], output_orderings=[[inc_col@0 ASC NULLS LAST],
[desc_col@1 DESC]], has_header=true
+
+# Union a query with the actual data and one with a constant
+query I
+SELECT (SELECT c from ordered_table ORDER BY c LIMIT 1) UNION ALL (SELECT 23
as c from ordered_table ORDER BY c LIMIT 1) ORDER BY c;
+----
+0
+23
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]