irenjj commented on PR #17110: URL: https://github.com/apache/datafusion/pull/17110#issuecomment-3172677607
# Decorrelate Subqueries Complete decorrelation consists of three parts: + rewrite dependent join: rewrite_dependent_join.rs + decorrelate dependent join: decorrelate_dependent_join.rs + eliminate duplicated delim scan: deliminator.rs ## 1. Rewrite Dependent Join Rewrite dependent join will rewrite plan_nodes containing subquery expressions into dependent joins, to facilitate subsequent processing of dependent joins, with the ultimate goal of eliminating dependent joins. DependentJoinRewriter structure: ```rust pub struct DependentJoinRewriter { // ID of the currently traversed node, incremented on each f_down(), decremented on each f_up(). current_id: usize, // Depth of the current DependentJoin, // incremented by 1 when encountering nested DependentJoin in f_down(), // decremented by 1 when processing DependentJoin in f_up(). subquery_depth: usize, // current_id -> Node // Maintains an IndexMap structure that stores all plan_nodes currently traversed. nodes: IndexMap<usize, Node>, // all the node ids from root to the current node stack: Vec<usize>, // track for each column, the nodes plan that reference to its within the tree all_outer_ref_columns: IndexMap<Column, Vec<ColumnAccess>>, } ``` It performs pre-order traversal of the operator tree, then rewrites the traversed operators through f_down() and f_up(). It assigns a unique current_id to each plan_node when traversing. ### 1.1 f_down f_down checks whether there are Subqueries in the expressions of each plan_node, and marks plan_nodes containing Subqueries as is_dependent_join_node. Subsequently, after traversing to the plan_node of the Subquery, it obtains the Subquery type. ## 2. Decorrelate Dependent Join Decorrelate dependent join performs decorrelation on the results generated by rewrite dependent join. Decorrelate dependent join is basically implemented according to the code in the paper, but with slight differences. The entry function is `DecorrelateDepednentJoin::rewrite()`, which constructs a DependentJoinDecorrelator in rewrite() and calls the decorrelate() function. In the decorrelate() function, it performs pre-order traversal of the plan_node tree until it finds a DependentJoin: ```rust fn decorrelate(plan: &LogicalPlan, ...) -> Result<LogicalPlan> { if let LogicalPlan::DependentJoin(djoin) = plan { // handle dependent join } else { Ok(plan .clone() .map_children(|n| Ok(Transformed::yes(self.decorrelate(&n, true, 0)?)))? .data) } } ``` Processing DependentJoin consists of several major steps: + Handle Left Child + Handle Right Child + Construct Join Condition DependentJoinDecorrelator structure: ```rust pub struct DependentJoinDecorrelator { // All correlated column information for the current depth. domains: IndexSet<CorrelatedColumnInfo>, // outer table column -> delim scan table column correlated_map: IndexMap<Column, Column>, // Whether the current DependentJoin is a nested DependentJoin is_initial: bool, // all correlated columns in current depth and downward correlated_columns: Vec<CorrelatedColumnInfo>, // check if we have to replace any COUNT aggregates into "CASE WHEN X IS NULL THEN 0 ELSE COUNT END" // store a mapping between a expr and its original index in the logplan output replacement_map: IndexMap<String, Expr>, // if during the top down traversal, we observe any operator that requires // joining all rows from the lhs with nullable rows on the rhs any_join: bool, delim_scan_id: usize, // All columns of the previously constructed delim scan. dscan_cols: Vec<Column>, } ``` ### 2.1. Handle Left Child First handle the left child, divided into two cases: + The currently processed DependentJoin is not a nested DependentJoin. + The currently processed DependentJoin is a nested DependentJoin. Then based on the mapping relationship in self.correlated_map, rewrite outer table columns to delim scan table columns. #### 2.1.1. Non-Nesting DependentJoin For non-nested DependentJoin, it recursively calls decorrelate(left), with the purpose of inheriting the information of the current DependentJoinDecorrelator. Since it's a non-nesting DependentJoin, no changes need to be made to the correlated column information. #### 2.1.2. Nesting DependentJoin For Nesting DependentJoin, it determines whether the LHS has correlated columns at the current depth (corresponding to accessing in the paper). The logic for determining the existence of correlated columns is in detect_correlated_expressions, which traverses the plan, finds all outer ref expressions for each plan_node, and checks whether they are in the domains of the current DecorrelateDependentJoin. The domains record all correlated columns of the current depth. + If the LHS has no correlated columns at the current depth, a new DependentJoinDecorrelator can be constructed to decorrelate the left child, because subsequent LHS processing is independent of the information (mainly correlated columns) in the current DependentJoinDecorrelator. + If the LHS has correlated columns at the current depth, the DependentJoin in the LHS needs to be pushed down to eliminate the DependentJoin of the LHS at the current depth. ### 2.2. Handle Right Child Processing the RHS constructs a new DependentJoinDecorrelator. The domains of the new DependentJoinDecorrelator are parent correlated columns of current level + dependent join of current level. By inheriting the correlated columns of the parent node, it handles multi-level dependent joins. Since the RHS of DependentJoin must have correlated columns, the right child is directly processed through push_down_dependent_join. ### 2.3. Join Condition In delim_join_conditions, different join types are constructed based on the subquery type in DependentJoin, and then Join Conditions of correlated column IsNotDistinctFrom delim scan column are constructed. ### 2.4 Unnest Function All Unnest logic is implemented in push_down_dependent_join, with different processing for each type of plan_node. ## 3. Eliminate Duplicated DelimGet The above two steps completed the decorrelation operation and introduced two new logical operators `DelimJoin` and `DelimGet` to fully construct the logical plan. At this stage, redundant `DelimJoin` and `DelimGet` will be eliminated. Construct a new use case to illustrate how the work at this stage is carried out: ```sql SELECT s.name FROM students s WHERE s.grade > ( -- subquery 1 SELECT AVG(e1.score) FROM exams e1 WHERE e1.sid = s.id AND e1.score > ( -- subquery 2 SELECT MIN(e2.score) FROM exams e2 WHERE e2.sid = s.id AND e2.type = 'final' ) ); ``` After decorrelating the above SQL, the result is generated: ```text Projection | Filter | DelimJoin1 / \ Get Projection | ComparisonJoin / \ DelimGet1 Aggregate | Filter | DelimJoin2 / \ CrossProduct Projection / \ | Get DelimGet2 ComparisonJoin / \ DelimGet3 Aggregate | Filter | CrossProduct / \ Get DelimGet4 ``` First, all DelimJoins and DelimGets under DelimJoins are collected as candidates. For the joins array under each candidate, they are sorted by depth from largest to smallest, finding the DelimGet with the deepest depth. ```text candidate1: { DelimJoin2, joins: [DelimGet4(depth=3), DelimGet3(depth=1)], } candidate2: { DelimJoin1, joins: [DelimGet2(depth=5), DelimGet1(depth=1)], } Projection | Filter | DelimJoin1 / ^ \ Get | Projection | | | ComparisonJoin | / \ | DelimGet1 Aggregate | | | +------+ Filter | | | DelimJoin2 | / ^ \ | CrossProduct | Projection | / \ | | | Get DelimGet2 | ComparisonJoin | | | / \ + ----------------+ | DelimGet3 Aggregate | | | +------+ Filter | | | CrossProduct | / \ | Get DelimGet4 | | +--------------------------+ ``` Process the deduplication of DelimGet under each candidate separately. Taking candidate1 as an example: ```text candidate1: { DelimJoin2, joins: [DelimGet4(depth=3), DelimGet3(depth=1)], } ``` The situation is divided into two categories: 1. If DelimJoin has selection conditions (filter/where clauses), DelimGet can be retained. This is because selection conditions can greatly reduce the amount of data in the right subtree of DelimJoin. This is an optimization trade-off that sacrifices certain deduplication optimization opportunities (retaining one DelimGet) but gains better query performance (through early filtering of selection conditions). This is why the code deliberately retains the deepest join with DelimGet when there are selection conditions. In this case: + Retaining the deepest join with DelimGet is meaningful because this selection condition can filter out a large amount of data early + Removing other shallower joins is still safe because the deepest one has already guaranteed data correctness + It also avoids duplicate join operations 2. For cases where DelimJoin has no selection conditions, it's necessary to continue determining whether DelimGet can be removed. The ultimate goal is to remove all correlated columns. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org