avantgardnerio commented on code in PR #2885:
URL: https://github.com/apache/arrow-datafusion/pull/2885#discussion_r919327114


##########
datafusion/optimizer/src/utils.rs:
##########
@@ -82,6 +85,127 @@ pub fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) 
-> LogicalPlan {
     })
 }
 
+/// Looks for correlating expressions: equality expressions with one field 
from the subquery, and
+/// one not in the subquery (closed upon from outer scope)
+///
+/// # Arguments
+///
+/// * `exprs` - List of expressions that may or may not be joins
+/// * `fields` - HashSet of fully qualified (table.col) fields in subquery 
schema
+///
+/// # Return value
+///
+/// Tuple of (expressions containing joins, remaining non-join expressions)
+pub fn find_join_exprs(
+    exprs: Vec<&Expr>,
+    schema: &DFSchemaRef,
+) -> (Vec<Expr>, Vec<Expr>) {
+    let fields: HashSet<_> = schema
+        .fields()
+        .iter()
+        .map(|it| it.qualified_name())
+        .collect();
+
+    let (joins, others): (Vec<_>, Vec<_>) = 
exprs.iter().partition_map(|filter| {
+        let (left, op, right) = match filter {
+            Expr::BinaryExpr { left, op, right } => (*left.clone(), *op, 
*right.clone()),
+            _ => return Either::Right((*filter).clone()), // not a 
column=column expression
+        };
+        match op {
+            Operator::Eq => {}
+            Operator::NotEq => {}
+            _ => return Either::Right((*filter).clone()), // not a 
column=column expression
+        }
+        let left = match left {
+            Expr::Column(c) => c,
+            _ => return Either::Right((*filter).clone()), // not a 
column=column expression
+        };
+        let right = match right {
+            Expr::Column(c) => c,
+            _ => return Either::Right((*filter).clone()), // not a 
column=column expression
+        };
+        if fields.contains(&left.flat_name()) && 
fields.contains(&right.flat_name()) {
+            return Either::Right((*filter).clone()); // both columns present 
(none closed-upon)
+        }
+        if !fields.contains(&left.flat_name()) && 
!fields.contains(&right.flat_name()) {
+            return Either::Right((*filter).clone()); // neither column present 
(syntax error?)
+        }
+
+        Either::Left((*filter).clone())
+    });
+
+    (joins, others)
+}
+
+/// Extracts correlating columns from expressions
+///
+/// # Arguments
+///
+/// * `exprs` - List of expressions that correlate a subquery to an outer scope
+/// * `fields` - HashSet of fully qualified (table.col) fields in subquery 
schema
+///
+/// # Return value
+///
+/// Tuple of tuples ((outer-scope cols, subquery cols), non-equal expressions)
+pub fn exprs_to_join_cols(
+    exprs: &[Expr],
+    schema: &DFSchemaRef,
+    include_negated: bool,
+) -> Result<(Vec<Column>, Vec<Column>, Option<Expr>)> {
+    let fields: HashSet<_> = schema
+        .fields()
+        .iter()
+        .map(|it| it.qualified_name())
+        .collect();
+
+    let mut joins: Vec<(String, String)> = vec![];
+    let mut others: Vec<Expr> = vec![];
+    for filter in exprs.iter() {
+        let (left, op, right) = match filter {
+            Expr::BinaryExpr { left, op, right } => (*left.clone(), *op, 
*right.clone()),
+            _ => Err(DataFusionError::Plan("Invalid 
expression!".to_string()))?,
+        };
+        match op {
+            Operator::Eq => {}
+            Operator::NotEq => {
+                if !include_negated {
+                    others.push((*filter).clone());
+                    continue;
+                }
+            }
+            _ => Err(DataFusionError::Plan("Invalid 
expression!".to_string()))?,
+        }
+        let left = match left {
+            Expr::Column(c) => c,
+            _ => Err(DataFusionError::Plan("Invalid 
expression!".to_string()))?,
+        };
+        let right = match right {
+            Expr::Column(c) => c,
+            _ => Err(DataFusionError::Plan("Invalid 
expression!".to_string()))?,
+        };
+        let sorted = if fields.contains(&left.flat_name()) {
+            (right.flat_name(), left.flat_name())
+        } else {
+            (left.flat_name(), right.flat_name())
+        };
+        joins.push(sorted);
+    }
+
+    let right_cols: Vec<_> = joins
+        .iter()
+        .map(|it| &it.1)
+        .map(|it| Column::from(it.as_str()))
+        .collect();
+    let left_cols: Vec<_> = joins
+        .iter()
+        .map(|it| &it.0)
+        .map(|it| Column::from(it.as_str()))

Review Comment:
   Thanks!!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to