AssHero commented on code in PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#discussion_r906177098


##########
datafusion/optimizer/src/reduce_outer_join.rs:
##########
@@ -0,0 +1,355 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Optimizer rule to reduce left/right/full join to inner join if possible.
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::{Column, DFSchema, Result};
+use datafusion_expr::{
+    logical_plan::{Filter, Join, JoinType, LogicalPlan, Projection},
+    utils::from_plan,
+};
+use datafusion_expr::{Expr, Operator};
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+#[derive(Default)]
+pub struct ReduceOuterJoin;
+
+impl ReduceOuterJoin {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl OptimizerRule for ReduceOuterJoin {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &OptimizerConfig,
+    ) -> Result<LogicalPlan> {
+        let mut nonnullable_cols: Vec<Column> = vec![];
+
+        reduce_outer_join(self, plan, &mut nonnullable_cols, optimizer_config)
+    }
+
+    fn name(&self) -> &str {
+        "reduce_outer_join"
+    }
+}
+
+/// Attempt to reduce outer joins to inner joins.
+/// for query: select ... from a left join b on ... where b.xx = 100;
+/// if b.xx is null, and b.xx = 100 returns false, filterd those null rows.
+/// Therefore, there is no need to produce null rows for output, we can use
+/// inner join instead of left join.
+///
+/// Generally, an outer join can be reduced to inner join if quals from where
+/// return false while any inputs are null and columns of those quals are come 
from
+/// nullable side of outer join.
+fn reduce_outer_join(
+    _optimizer: &ReduceOuterJoin,
+    plan: &LogicalPlan,
+    nonnullable_cols: &mut Vec<Column>,
+    _optimizer_config: &OptimizerConfig,
+) -> Result<LogicalPlan> {
+    match plan {
+        LogicalPlan::Filter(Filter { input, predicate }) => match &**input {
+            LogicalPlan::Join(join) => {
+                extract_nonnullable_columns(
+                    predicate,
+                    nonnullable_cols,
+                    join.left.schema(),
+                    join.right.schema(),
+                    true,
+                )?;
+                Ok(LogicalPlan::Filter(Filter {
+                    predicate: predicate.clone(),
+                    input: Arc::new(reduce_outer_join(
+                        _optimizer,
+                        input,
+                        nonnullable_cols,
+                        _optimizer_config,
+                    )?),
+                }))
+            }
+            _ => Ok(LogicalPlan::Filter(Filter {
+                predicate: predicate.clone(),
+                input: Arc::new(reduce_outer_join(
+                    _optimizer,
+                    input,
+                    nonnullable_cols,
+                    _optimizer_config,
+                )?),
+            })),
+        },
+        LogicalPlan::Join(join) => {
+            let mut new_join_type = join.join_type;
+
+            if join.join_type == JoinType::Left
+                || join.join_type == JoinType::Right
+                || join.join_type == JoinType::Full
+            {
+                let mut left_nonnullable = false;
+                let mut right_nonnullable = false;
+                for col in nonnullable_cols.iter_mut() {
+                    if join.left.schema().field_from_column(col).is_ok() {
+                        left_nonnullable = true;
+                    }
+                    if join.right.schema().field_from_column(col).is_ok() {
+                        right_nonnullable = true;
+                    }
+                }
+
+                match join.join_type {
+                    JoinType::Left => {
+                        if right_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        }
+                    }
+                    JoinType::Right => {
+                        if left_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        }
+                    }
+                    JoinType::Full => {
+                        if left_nonnullable && right_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        } else if left_nonnullable {
+                            new_join_type = JoinType::Left;
+                        } else if right_nonnullable {
+                            new_join_type = JoinType::Right;
+                        }
+                    }
+                    _ => {}
+                };
+            }
+
+            let left_plan = reduce_outer_join(
+                _optimizer,
+                &join.left,
+                nonnullable_cols,
+                _optimizer_config,
+            )?;
+            let right_plan = reduce_outer_join(
+                _optimizer,
+                &join.right,
+                nonnullable_cols,
+                _optimizer_config,
+            )?;
+
+            Ok(LogicalPlan::Join(Join {
+                left: Arc::new(left_plan),
+                right: Arc::new(right_plan),
+                join_type: new_join_type,
+                join_constraint: join.join_constraint,
+                on: join.on.clone(),
+                filter: join.filter.clone(),
+                schema: join.schema.clone(),
+                null_equals_null: join.null_equals_null,
+            }))
+        }
+        LogicalPlan::Projection(Projection {

Review Comment:
   This is for such query
   select * from (select * from tt1 left join tt2 on c1 = c3) a right join tt3 
on a.c2 = tt3.c5 where a.c4 < 100;
   
   The a.c4 can be used to reduce left join to inner join, but we need to know 
that a.c4 is corresponding with tt2.c4.
   
   I need to think about the case with sort.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to