andygrove commented on code in PR #2813:
URL: https://github.com/apache/arrow-datafusion/pull/2813#discussion_r911146230
##########
datafusion/optimizer/src/subquery_decorrelate.rs:
##########
@@ -0,0 +1,168 @@
+use crate::{utils, OptimizerConfig, OptimizerRule};
+use datafusion_common::Column;
+use datafusion_expr::logical_plan::{Filter, JoinType, Subquery};
+use datafusion_expr::{combine_filters, Expr, LogicalPlan, LogicalPlanBuilder,
Operator};
+use hashbrown::HashSet;
+use itertools::{Either, Itertools};
+use std::sync::Arc;
+
+/// Optimizer rule for rewriting subquery filters to joins
+#[derive(Default)]
+pub struct SubqueryDecorrelate {}
+
+impl SubqueryDecorrelate {
+ #[allow(missing_docs)]
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+
+impl OptimizerRule for SubqueryDecorrelate {
+ fn optimize(
+ &self,
+ plan: &LogicalPlan,
+ optimizer_config: &OptimizerConfig,
+ ) -> datafusion_common::Result<LogicalPlan> {
+ match plan {
+ LogicalPlan::Filter(Filter { predicate, input }) => {
+ match predicate {
+ // TODO: arbitrary expressions
+ Expr::Exists { subquery, negated } => {
+ if *negated {
+ return Ok(plan.clone());
+ }
+ optimize_exists(plan, subquery, input)
+ }
+ _ => Ok(plan.clone()),
+ }
+ }
+ _ => {
+ // Apply the optimization to all inputs of the plan
+ utils::optimize_children(self, plan, optimizer_config)
+ }
+ }
+ }
+
+ fn name(&self) -> &str {
+ "subquery_decorrelate"
+ }
+}
+
+/// Takes a query like:
+///
+/// select c.id from customers c where exists (select * from orders o where
o.c_id = c.id)
+///
+/// and optimizes it into:
+///
+/// select c.id from customers c
+/// inner join (select o.c_id from orders o group by o.c_id) o on o.c_id =
c.c_id
+fn optimize_exists(
+ plan: &LogicalPlan,
+ subquery: &Subquery,
+ input: &Arc<LogicalPlan>,
+) -> datafusion_common::Result<LogicalPlan> {
+ // Only operate if there is one input
+ let sub_inputs = subquery.subquery.inputs();
+ if sub_inputs.len() != 1 {
+ return Ok(plan.clone());
+ }
+ let sub_input = if let Some(i) = sub_inputs.get(0) {
+ i
+ } else {
+ return Ok(plan.clone());
+ };
+
+ // Only operate on subqueries that are trying to filter on an expression
from an outer query
+ let filter = if let LogicalPlan::Filter(f) = sub_input {
+ f
+ } else {
+ return Ok(plan.clone());
+ };
+
+ // split into filters
+ let mut filters = vec![];
+ utils::split_conjunction(&filter.predicate, &mut filters);
+
+ // get names of fields TODO: Must fully qualify these!
+ let fields: HashSet<_> = sub_input
+ .schema()
+ .fields()
+ .iter()
+ .map(|f| f.name())
+ .collect();
Review Comment:
You should be able to get a hashset of qualified names like this:
```suggestion
let fields = HashSet::from_iter(sub_input
.schema()
.field_names());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]