irenjj commented on PR #17110:
URL: https://github.com/apache/datafusion/pull/17110#issuecomment-3172677607

   # Decorrelate Subqueries
   
   Complete decorrelation consists of three parts:
   + rewrite dependent join: rewrite_dependent_join.rs
   + decorrelate dependent join: decorrelate_dependent_join.rs
   + eliminate duplicated delim scan: deliminator.rs
   
   ## 1. Rewrite Dependent Join
   
   Rewrite dependent join will rewrite plan_nodes containing subquery 
expressions into dependent joins, to facilitate subsequent processing of 
dependent joins, with the ultimate goal of eliminating dependent joins.
   
   DependentJoinRewriter structure:
   ```rust
   pub struct DependentJoinRewriter {
       // ID of the currently traversed node, incremented on each f_down(), 
decremented on each f_up().
       current_id: usize,
       // Depth of the current DependentJoin,
       // incremented by 1 when encountering nested DependentJoin in f_down(),
       // decremented by 1 when processing DependentJoin in f_up().
       subquery_depth: usize,
       // current_id -> Node
       // Maintains an IndexMap structure that stores all plan_nodes currently 
traversed.
       nodes: IndexMap<usize, Node>,
       // all the node ids from root to the current node
       stack: Vec<usize>,
       // track for each column, the nodes plan that reference to its within 
the tree
       all_outer_ref_columns: IndexMap<Column, Vec<ColumnAccess>>,
   }
   ```
   
   It performs pre-order traversal of the operator tree, then rewrites the 
traversed operators through f_down() and f_up(). It assigns a unique current_id 
to each plan_node when traversing.
   
   ### 1.1 f_down
   
   f_down checks whether there are Subqueries in the expressions of each 
plan_node, and marks plan_nodes containing Subqueries as is_dependent_join_node.
   Subsequently, after traversing to the plan_node of the Subquery, it obtains 
the Subquery type.
   
   ## 2. Decorrelate Dependent Join
   
   Decorrelate dependent join performs decorrelation on the results generated 
by rewrite dependent join.
   
   Decorrelate dependent join is basically implemented according to the code in 
the paper, but with slight differences. The entry function is 
`DecorrelateDepednentJoin::rewrite()`, which constructs a 
DependentJoinDecorrelator in rewrite() and calls the decorrelate() function.
   
   In the decorrelate() function, it performs pre-order traversal of the 
plan_node tree until it finds a DependentJoin:
   ```rust
   fn decorrelate(plan: &LogicalPlan, ...) -> Result<LogicalPlan> {
       if let LogicalPlan::DependentJoin(djoin) = plan {
           // handle dependent join
       } else {
           Ok(plan
               .clone()
               .map_children(|n| Ok(Transformed::yes(self.decorrelate(&n, true, 
0)?)))?
               .data)
       }
   }
   ```
   
   Processing DependentJoin consists of several major steps:
   + Handle Left Child
   + Handle Right Child
   + Construct Join Condition
   
   DependentJoinDecorrelator structure:
   ```rust
   pub struct DependentJoinDecorrelator {
       // All correlated column information for the current depth.
       domains: IndexSet<CorrelatedColumnInfo>,
       // outer table column -> delim scan table column
       correlated_map: IndexMap<Column, Column>,
       // Whether the current DependentJoin is a nested DependentJoin
       is_initial: bool,
   
       // all correlated columns in current depth and downward
       correlated_columns: Vec<CorrelatedColumnInfo>,
       // check if we have to replace any COUNT aggregates into "CASE WHEN X IS 
NULL THEN 0 ELSE COUNT END"
       // store a mapping between a expr and its original index in the logplan 
output
       replacement_map: IndexMap<String, Expr>,
       // if during the top down traversal, we observe any operator that 
requires
       // joining all rows from the lhs with nullable rows on the rhs
       any_join: bool,
       delim_scan_id: usize,
       // All columns of the previously constructed delim scan.
       dscan_cols: Vec<Column>,
   }
   ```
   
   ### 2.1. Handle Left Child
   
   First handle the left child, divided into two cases:
   + The currently processed DependentJoin is not a nested DependentJoin.
   + The currently processed DependentJoin is a nested DependentJoin.
   
   Then based on the mapping relationship in self.correlated_map, rewrite outer 
table columns to delim scan table columns.
   
   #### 2.1.1. Non-Nesting DependentJoin
   
   For non-nested DependentJoin, it recursively calls decorrelate(left), with 
the purpose of inheriting the information of the current 
DependentJoinDecorrelator. Since it's a non-nesting DependentJoin, no changes 
need to be made to the correlated column information.
   
   #### 2.1.2. Nesting DependentJoin
   
   For Nesting DependentJoin, it determines whether the LHS has correlated 
columns at the current depth (corresponding to accessing in the paper). The 
logic for determining the existence of correlated columns is in 
detect_correlated_expressions, which traverses the plan, finds all outer ref 
expressions for each plan_node, and checks whether they are in the domains of 
the current DecorrelateDependentJoin. The domains record all correlated columns 
of the current depth.
   + If the LHS has no correlated columns at the current depth, a new 
DependentJoinDecorrelator can be constructed to decorrelate the left child, 
because subsequent LHS processing is independent of the information (mainly 
correlated columns) in the current DependentJoinDecorrelator.
   + If the LHS has correlated columns at the current depth, the DependentJoin 
in the LHS needs to be pushed down to eliminate the DependentJoin of the LHS at 
the current depth.
   
   ### 2.2. Handle Right Child
   
   Processing the RHS constructs a new DependentJoinDecorrelator. The domains 
of the new DependentJoinDecorrelator are parent correlated columns of current 
level + dependent join of current level. By inheriting the correlated columns 
of the parent node, it handles multi-level dependent joins.
   
   Since the RHS of DependentJoin must have correlated columns, the right child 
is directly processed through push_down_dependent_join.
   
   ### 2.3. Join Condition
   
   In delim_join_conditions, different join types are constructed based on the 
subquery type in DependentJoin, and then Join Conditions of correlated column 
IsNotDistinctFrom delim scan column are constructed.
   
   ### 2.4 Unnest Function
   
   All Unnest logic is implemented in push_down_dependent_join, with different 
processing for each type of plan_node.
   
   ## 3. Eliminate Duplicated DelimGet
   
   The above two steps completed the decorrelation operation and introduced two 
new logical operators `DelimJoin` and `DelimGet` to fully construct the logical 
plan. At this stage, redundant `DelimJoin` and `DelimGet` will be eliminated.
   
   Construct a new use case to illustrate how the work at this stage is carried 
out:
   ```sql
   SELECT s.name
   FROM students s
   WHERE s.grade > (
       -- subquery 1
       SELECT AVG(e1.score)
       FROM exams e1
       WHERE e1.sid = s.id
       AND e1.score > (
           -- subquery 2
           SELECT MIN(e2.score)
           FROM exams e2
           WHERE e2.sid = s.id
           AND e2.type = 'final'
       )
   );
   ```
   
   After decorrelating the above SQL, the result is generated:
   ```text
               Projection
                   |
                 Filter
                   |
               DelimJoin1
             /            \
          Get           Projection
                              |
                          ComparisonJoin
                        /               \
                     DelimGet1         Aggregate
                                          |
                                        Filter
                                          |
                                       DelimJoin2
                                    /              \
                            CrossProduct          Projection
                            /      \                  |
                         Get    DelimGet2       ComparisonJoin
                                                /            \
                                           DelimGet3        Aggregate
                                                                |
                                                              Filter
                                                                |
                                                          CrossProduct
                                                          /           \
                                                        Get        DelimGet4
   ```
   
   First, all DelimJoins and DelimGets under DelimJoins are collected as 
candidates. For the joins array under each candidate, they are sorted by depth 
from largest to smallest, finding the DelimGet with the deepest depth.
   ```text
   candidate1: {
       DelimJoin2,
       joins: [DelimGet4(depth=3), DelimGet3(depth=1)],
   }
   
   candidate2: {
       DelimJoin1,
       joins: [DelimGet2(depth=5), DelimGet1(depth=1)],
   }
   
        Projection
            |
          Filter
            |
        DelimJoin1
      /    ^       \
   Get     |     Projection
           |           |
           |       ComparisonJoin
           |     /               \
           |  DelimGet1         Aggregate
           |      |                |
           +------+              Filter
           |                       |
           |                    DelimJoin2
           |                 /      ^       \
           |         CrossProduct   |      Projection
           |         /      \       |          |
           |      Get    DelimGet2  |    ComparisonJoin
           |                 |      |    /            \
           + ----------------+      |  DelimGet3     Aggregate
                                    |      |             |
                                    +------+           Filter
                                    |                    |
                                    |              CrossProduct
                                    |              /           \
                                    |            Get        DelimGet4
                                    |                          |
                                    +--------------------------+
   ```
   
   Process the deduplication of DelimGet under each candidate separately. 
Taking candidate1 as an example:
   ```text
   candidate1: {
       DelimJoin2,
       joins: [DelimGet4(depth=3), DelimGet3(depth=1)],
   }
   ```
   
   The situation is divided into two categories:
   
   1. If DelimJoin has selection conditions (filter/where clauses), DelimGet 
can be retained. This is because selection conditions can greatly reduce the 
amount of data in the right subtree of DelimJoin. This is an optimization 
trade-off that sacrifices certain deduplication optimization opportunities 
(retaining one DelimGet) but gains better query performance (through early 
filtering of selection conditions). This is why the code deliberately retains 
the deepest join with DelimGet when there are selection conditions. In this 
case:
       + Retaining the deepest join with DelimGet is meaningful because this 
selection condition can filter out a large amount of data early
       + Removing other shallower joins is still safe because the deepest one 
has already guaranteed data correctness
       + It also avoids duplicate join operations
   2. For cases where DelimJoin has no selection conditions, it's necessary to 
continue determining whether DelimGet can be removed. The ultimate goal is to 
remove all correlated columns.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to