jiangjiangtian opened a new issue, #12413:
URL: https://github.com/apache/gluten/issues/12413

   ### Backend
   
   VL (Velox)
   
   ### Bug description
   
   Suppose we want to execute the following SQL (the SQL is just for showing 
and doesn't mean anything):
   ```SQL
   CREATE OR REPLACE TEMP VIEW employees AS
   SELECT * FROM VALUES
   (1, 'Alice', 'Sales,Computer', 1000),
   (2, 'Bob', 'Marketing,Sales', 2000),
   (3, 'Charlie', 'Marketing,Trading', 3000)
   AS employees(emp_id, emp_name, dept_names, sale);
   
   set spark.sql.adaptive.enabled=false;
   
   EXPLAIN WITH base AS (
   SELECT * FROM employees
     LATERAL VIEW explode(split(dept_names, ',')) AS dept_name
   )
   SELECT * FROM
   (
     SELECT emp_id, emp_name, COUNT(*)
     FROM (
       SELECT emp_id, emp_name, dept_names, sale
       FROM base
       GROUP BY 1, 2, 3, 4
     )
     GROUP BY 1, 2
   ) a
   JOIN
   (
     SELECT emp_id, SUM(sale)
     FROM (
       SELECT emp_id, emp_name, dept_names, sale
       FROM base
       GROUP BY 1, 2, 3, 4
     )
     GROUP BY 1
   ) b
   ON a.emp_id = b.emp_id;
   ```
   The plan is:
   ```
   == Physical Plan ==
   VeloxColumnarToRow
   +- ^(6) BroadcastHashJoinExecTransformer [emp_id#78], [emp_id#86], Inner, 
BuildRight, false
      :- ^(6) HashAggregateTransformer(keys=[emp_id#78, emp_name#79], 
functions=[count(1)], isStreamingAgg=false)
      :  +- ^(6) InputIteratorTransformer[emp_id#78, emp_name#79, count#101L]
      :     +- ColumnarExchange hashpartitioning(emp_id#78, emp_name#79, 2000), 
ENSURE_REQUIREMENTS, [emp_id#78, emp_name#79, count#101L], [plan_id=1889], 
[shuffle_writer_type=hash], [OUTPUT] List(emp_id:IntegerType, 
emp_name:StringType, count:LongType)
      :        +- VeloxResizeBatches 1024, 2147483647, 10485760
      :           +- ^(2) ProjectExecTransformer [hash(emp_id#78, emp_name#79, 
42) AS hash_partition_key#129, emp_id#78, emp_name#79, count#101L]
      :              +- ^(2) FlushableHashAggregateTransformer(keys=[emp_id#78, 
emp_name#79], functions=[partial_count(1)], isStreamingAgg=false)
      :                 +- ^(2) ProjectExecTransformer [emp_id#78, emp_name#79]
      :                    +- ^(2) HashAggregateTransformer(keys=[emp_id#78, 
emp_name#79, dept_names#80, sale#81], functions=[], isStreamingAgg=false)
      :                       +- ^(2) InputIteratorTransformer[emp_id#78, 
emp_name#79, dept_names#80, sale#81]
      :                          +- ColumnarExchange 
hashpartitioning(emp_id#78, emp_name#79, dept_names#80, sale#81, 2000), 
ENSURE_REQUIREMENTS, [emp_id#78, emp_name#79, dept_names#80, sale#81], 
[plan_id=1880], [shuffle_writer_type=hash], [OUTPUT] List(emp_id:IntegerType, 
emp_name:StringType, dept_names:StringType, sale:IntegerType)
      :                             +- VeloxResizeBatches 1024, 2147483647, 
10485760
      :                                +- ^(1) ProjectExecTransformer 
[hash(emp_id#78, emp_name#79, dept_names#80, sale#81, 42) AS 
hash_partition_key#128, emp_id#78, emp_name#79, dept_names#80, sale#81]
      :                                   +- ^(1) 
FlushableHashAggregateTransformer(keys=[emp_id#78, emp_name#79, dept_names#80, 
sale#81], functions=[], isStreamingAgg=false)
      :                                      +- ^(1) ProjectExecTransformer 
[emp_id#78, emp_name#79, dept_names#80, sale#81]
      :                                         +- ^(1) GenerateExecTransformer 
explode(split(dept_names#80, ,, -1) AS _pre_0#104), [emp_id#78, emp_name#79, 
dept_names#80, sale#81], false, [dept_name#95]
      :                                            +- ^(1) 
ProjectExecTransformer [emp_id#78, emp_name#79, dept_names#80, sale#81, 
split(dept_names#80, ,, -1) AS _pre_0#104]
      :                                               +- ^(1) 
InputIteratorTransformer[emp_id#78, emp_name#79, dept_names#80, sale#81]
      :                                                  +- RowToVeloxColumnar
      :                                                     +- LocalTableScan 
[emp_id#78, emp_name#79, dept_names#80, sale#81]
      +- ^(6) InputIteratorTransformer[emp_id#86, sum(sale)#99L]
         +- ColumnarBroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), 
[plan_id=1916]
            +- ^(5) HashAggregateTransformer(keys=[emp_id#86], 
functions=[sum(sale#89)], isStreamingAgg=false)
               +- ^(5) InputIteratorTransformer[emp_id#86, sum#103L]
                  +- ColumnarExchange hashpartitioning(emp_id#86, 2000), 
ENSURE_REQUIREMENTS, [emp_id#86, sum#103L], [plan_id=1911], 
[shuffle_writer_type=hash], [OUTPUT] List(emp_id:IntegerType, sum:LongType)
                     +- VeloxResizeBatches 1024, 2147483647, 10485760
                        +- ^(4) ProjectExecTransformer [hash(emp_id#86, 42) AS 
hash_partition_key#131, emp_id#86, sum#103L]
                           +- ^(4) 
FlushableHashAggregateTransformer(keys=[emp_id#86], 
functions=[partial_sum(sale#89)], isStreamingAgg=false)
                              +- ^(4) ProjectExecTransformer [emp_id#86, 
sale#89]
                                 +- ^(4) 
HashAggregateTransformer(keys=[emp_id#86, emp_name#87, dept_names#88, sale#89], 
functions=[], isStreamingAgg=false)
                                    +- ^(4) InputIteratorTransformer[emp_id#86, 
emp_name#87, dept_names#88, sale#89]
                                       +- ColumnarExchange 
hashpartitioning(emp_id#86, emp_name#87, dept_names#88, sale#89, 2000), 
ENSURE_REQUIREMENTS, [emp_id#86, emp_name#87, dept_names#88, sale#89], 
[plan_id=1902], [shuffle_writer_type=hash], [OUTPUT] List(emp_id:IntegerType, 
emp_name:StringType, dept_names:StringType, sale:IntegerType)
                                          +- VeloxResizeBatches 1024, 
2147483647, 10485760
                                             +- ^(3) ProjectExecTransformer 
[hash(emp_id#86, emp_name#87, dept_names#88, sale#89, 42) AS 
hash_partition_key#130, emp_id#86, emp_name#87, dept_names#88, sale#89]
                                                +- ^(3) 
FlushableHashAggregateTransformer(keys=[emp_id#86, emp_name#87, dept_names#88, 
sale#89], functions=[], isStreamingAgg=false)
                                                   +- ^(3) 
ProjectExecTransformer [emp_id#86, emp_name#87, dept_names#88, sale#89]
                                                      +- ^(3) 
GenerateExecTransformer explode(split(dept_names#88, ,, -1) AS _pre_1#105), 
[emp_id#86, emp_name#87, dept_names#88, sale#89], false, [dept_name#96]
                                                         +- ^(3) 
ProjectExecTransformer [emp_id#86, emp_name#87, dept_names#88, sale#89, 
split(dept_names#88, ,, -1) AS _pre_1#105]
                                                            +- ^(3) 
InputIteratorTransformer[emp_id#86, emp_name#87, dept_names#88, sale#89]
                                                               +- 
RowToVeloxColumnar
                                                                  +- 
LocalTableScan [emp_id#86, emp_name#87, dept_names#88, sale#89]
   ```
   The plan is correct but we lose the opportunity to apply the optimization of 
reusing exchange. The two ColumnarExchange nodes (plan_id=1880 and 
plan_id=1902) are structurally identical, but are not reused (no ReusedExchange 
appears in the plan).
   The root cause is the Alias expression in the generator of 
GenerateExecTransformer. In the plan above, the first branch has:
   ```
   GenerateExecTransformer explode(split(dept_names#80, ,, -1) AS _pre_0#104), 
...
   ```
   while the second branch has:
   ```
   GenerateExecTransformer explode(split(dept_names#88, ,, -1) AS _pre_1#105), 
...
   ```
   The Alias (AS _pre_0#104 / AS _pre_1#105) is nested inside the generator 
expression. Spark's only assigns position-based ExprId to top-level Alias nodes 
in the plan's expressions. Since the generator (e.g., Explode) is not an Alias, 
it falls into the case other branch which calls 
[normalizeExpressions](https://github.com/apache/spark/blob/branch-3.5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala#L650-L668).
 However, normalizeExpressions only replaces the exprId of AttributeReference — 
it does not modify the exprId of a nested Alias. As a result, the Alias inside 
the generator retains its original globally-unique exprId (e.g., #104 vs #105). 
Since compares exprId, the two structurally-identical sub-plans produce 
different canonicalized forms, preventing ReuseExchange from reusing the 
exchange.
   
   ### Gluten version
   
   _No response_
   
   ### Spark version
   
   None
   
   ### Spark configurations
   
   3.5
   
   ### System information
   
   _No response_
   
   ### Relevant logs
   
   ```bash
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to