alamb commented on code in PR #4043:
URL: https://github.com/apache/arrow-datafusion/pull/4043#discussion_r1012168379


##########
datafusion/physical-expr/src/utils.rs:
##########
@@ -65,6 +74,210 @@ pub fn sort_expr_list_eq_strict_order(
     list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, 
e2)| e1.eq(e2))
 }
 
+/// Assume the predicate is in the form of CNF, split the predicate to a Vec 
of PhysicalExprs.
+///
+/// For example, split "a1 = a2 AND b1 <= b2 AND c1 != c2" into ["a1 = a2", 
"b1 <= b2", "c1 != c2"]
+///
+pub fn split_predicate(predicate: &Arc<dyn PhysicalExpr>) -> Vec<&Arc<dyn 
PhysicalExpr>> {
+    match predicate.as_any().downcast_ref::<BinaryExpr>() {
+        Some(binary) => match binary.op() {
+            Operator::And => {
+                let mut vec1 = split_predicate(binary.left());
+                let vec2 = split_predicate(binary.right());
+                vec1.extend(vec2);
+                vec1
+            }
+            _ => vec![predicate],
+        },
+        None => vec![],
+    }
+}
+
+/// Combine the new equal condition with the existing equivalence properties.
+pub fn combine_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    new_condition: (&Column, &Column),
+) {
+    let mut idx1 = -1i32;
+    let mut idx2 = -1i32;
+    for (idx, prop) in eq_properties.iter_mut().enumerate() {
+        let contains_first = prop.contains(new_condition.0);
+        let contains_second = prop.contains(new_condition.1);
+        if contains_first && !contains_second {
+            prop.insert(new_condition.1.clone());
+            idx1 = idx as i32;
+        } else if !contains_first && contains_second {
+            prop.insert(new_condition.0.clone());
+            idx2 = idx as i32;
+        } else if contains_first && contains_second {
+            idx1 = idx as i32;
+            idx2 = idx as i32;
+            break;
+        }
+    }
+
+    if idx1 != -1 && idx2 != -1 && idx1 != idx2 {
+        // need to merge the two existing properties
+        let second_properties = eq_properties.get(idx2 as 
usize).unwrap().clone();
+        let first_properties = eq_properties.get_mut(idx1 as usize).unwrap();
+        for prop in second_properties.iter() {
+            if !first_properties.contains(prop) {
+                first_properties.insert(prop.clone());
+            }
+        }
+        eq_properties.remove(idx2 as usize);
+    } else if idx1 == -1 && idx2 == -1 {
+        // adding new pairs
+        eq_properties.push(EquivalenceProperties::new(
+            new_condition.0.clone(),
+            vec![new_condition.1.clone()],
+        ))
+    }
+}
+
+pub fn remove_equivalence_properties(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    remove_condition: (&Column, &Column),
+) {
+    let mut match_idx = -1i32;
+    for (idx, prop) in eq_properties.iter_mut().enumerate() {
+        let contains_first = prop.contains(remove_condition.0);
+        let contains_second = prop.contains(remove_condition.1);
+        if contains_first && contains_second {
+            match_idx = idx as i32;
+            break;
+        }
+    }
+    if match_idx >= 0 {
+        let matches = eq_properties.get_mut(match_idx as usize).unwrap();
+        matches.remove(remove_condition.0);
+        matches.remove(remove_condition.1);
+        if matches.len() <= 1 {
+            eq_properties.remove(match_idx as usize);
+        }
+    }
+}
+
+pub fn merge_equivalence_properties_with_alias(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    alias_map: &HashMap<Column, Vec<Column>>,
+) {
+    for (column, columns) in alias_map {
+        let mut find_match = false;
+        for (_idx, prop) in eq_properties.iter_mut().enumerate() {
+            if prop.contains(column) {
+                for col in columns {
+                    prop.insert(col.clone());
+                }
+                find_match = true;
+                break;
+            }
+        }
+        if !find_match {
+            eq_properties
+                .push(EquivalenceProperties::new(column.clone(), 
columns.clone()));
+        }
+    }
+}
+
+pub fn truncate_equivalence_properties_not_in_schema(
+    eq_properties: &mut Vec<EquivalenceProperties>,
+    schema: &SchemaRef,
+) {
+    for props in eq_properties.iter_mut() {
+        let mut columns_to_remove = vec![];
+        for column in props.iter() {
+            if let Ok(idx) = schema.index_of(column.name()) {
+                if idx != column.index() {
+                    columns_to_remove.push(column.clone());
+                }
+            } else {
+                columns_to_remove.push(column.clone());
+            }
+        }
+        for column in columns_to_remove {
+            props.remove(&column);
+        }
+    }
+    eq_properties.retain(|props| props.len() > 1);
+}
+
+/// Normalize the output expressions based on Alias Map and SchemaRef.
+///
+/// 1) If there is mapping in Alias Map, replace the Column in the output 
expressions with the 1st Column in Alias Map
+/// 2) If the Column is invalid for the current Schema, replace the Column 
with a place holder UnKnownColumn
+///
+pub fn normalize_out_expr_with_alias_schema(
+    expr: Arc<dyn PhysicalExpr>,
+    alias_map: &HashMap<Column, Vec<Column>>,
+    schema: &SchemaRef,
+) -> Arc<dyn PhysicalExpr> {
+    let expr_clone = expr.clone();
+    expr_clone
+        .transform(&|expr| {
+            let normalized_form: Option<Arc<dyn PhysicalExpr>> =
+                match expr.as_any().downcast_ref::<Column>() {
+                    Some(column) => {
+                        let out = alias_map
+                            .get(column)
+                            .map(|c| {
+                                let out_col: Arc<dyn PhysicalExpr> =
+                                    Arc::new(c[0].clone());
+                                out_col
+                            })
+                            .or_else(|| match schema.index_of(column.name()) {
+                                // Exactly matching, return None, no need to 
do the transform
+                                Ok(idx) if column.index() == idx => None,
+                                _ => {
+                                    let out_col: Arc<dyn PhysicalExpr> =
+                                        
Arc::new(UnKnownColumn::new(column.name()));
+                                    Some(out_col)
+                                }
+                            });
+                        out
+                    }
+                    None => None,
+                };
+            normalized_form
+        })
+        .unwrap_or(expr)
+}
+
+pub fn normalize_expr_with_equivalence_properties(
+    expr: Arc<dyn PhysicalExpr>,
+    eq_properties: &[EquivalenceProperties],
+) -> Arc<dyn PhysicalExpr> {
+    let mut normalized = expr.clone();
+    if let Some(column) = expr.as_any().downcast_ref::<Column>() {

Review Comment:
   Does this need to recursively rewrite exprs?
   
   Like what if `expr` was `A + B` and you had an equivalence class with `B = C`
   
   Wouldn't you have to rewrite `A + B` into `A + C`? But I don't see this code 
recursing.
   
   This kind of rewrite could be tested as well I think



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to