[ 
https://issues.apache.org/jira/browse/SPARK-51582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17937386#comment-17937386
 ] 

Shardul Mahadik commented on SPARK-51582:
-----------------------------------------

cc: [~maropu] [~kiszk] [~viirya] Would be great to hear your thoughts since you 
had worked on/reviewed Expand codegen before. Also cc: [~cloud_fan] since this 
is SQL related. 

> Improve codegen for ExpandExec with high number of projections
> --------------------------------------------------------------
>
>                 Key: SPARK-51582
>                 URL: https://issues.apache.org/jira/browse/SPARK-51582
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.5.5
>            Reporter: Shardul Mahadik
>            Priority: Major
>
> Currently, ExpandExec uses a [switch-case based 
> codegen|https://github.com/apache/spark/blob/7ce5d3294325de6ef5d8d07d611e92975092d31f/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala#L118]
>  mechanism where the amount of code generated is proportional to (N * O) 
> where N is the number of rows produced per input row and O is the number of 
> output columns. For a cubing operation (or other similar grouping sets 
> operations), which seems to be the main use case for Expand, the number of 
> output rows per input row N = 2^D, where D is the number of cubing 
> dimensions. We find that the current codegen for ExpandExec fails for as low 
> as D = 10, because the generated code hits the 64KB JVM limit for bytecode 
> per method. SPARK-35329 tries to improve on this by moving the contents of 
> each case block to its own method, but the code within the _consume_ method 
> is still proportional to N (which can be much much higher than O) and fails 
> for as low as D = 12. This unfortunately is not sufficient for some for our 
> use cases where D can go as high as 16.
> Instead of generating switch-cases, we can do the following:
>  # Find all the unique individual expressions across [the 
> projections|https://github.com/apache/spark/blob/7ce5d3294325de6ef5d8d07d611e92975092d31f/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala#L37].
>  For most use cases, this number is much smaller than (N * O). E.g. For the 
> cubing operation, the number of individual expressions is (N + O), O since 1 
> expr per every output column (dimension and values) and N since Spark assigns 
> literals 1…N to each row of the Expand output. Consider
> {code:java}
> projections = [ [k1, k2, k3, val1, 1],
>                 [null, k2, k3, val1, 2],
>                 [k1, null, k3, val1, 3],
>                 .
>                 .
>                 [null, null, null, val1, 8] ]
> {code}
> then
> {code:java}
> uniqueExprs = [null, k1, k2, k3, val1, 1, 2, … N]
> {code}
>  # Create an expression map which follows the same structure as projections, 
> but replaces expressions with their index in uniqueExprs
> {code:java}
> exprMap = [ [1, 2, 3, 4, 5],
>             [0, 2, 3, 4, 6],
>             [1, 0, 3, 4, 7],
>             .
>             .
>             [0, 0, 0, 4, 12] ]
> {code}
>  # Calculate the value of all the unique expressions once per input row.
> {code:java}
> exprOutputs = [ eval(null), eval(k1), eval(k2), ….. ]
> {code}
>  # For every projection, for every output column, assign the value by looking 
> up the corresponding value in exprOutputs using the index from exprMap
> Pseudocode:
> {code:java}
> val uniqueExprs = distinct on projections.flatten
> val exprMap = for each expr in projections, find its index in uniqueExprs
> do_consume():
>       val exprOutputs = evaluate all exprs in uniqueExprs on input row
>       for (i : projections.indices) {
>               // find value of output column by fetching the index from 
> exprMap
>               // and using the index to offset into exprOutputs
>               index = exprMap[i][0]
>               output[0] = exprOutputs[index]
>               index = exprMap[i][1]
>               output[1] = exprOutputs[index]
>               .
>               .
>               .
>    }
> {code}
> With this approach, steps 1 and 2 are performed during codegen and their 
> outputs are added to reference objects that can be accessed during code 
> execution. Also, we can further optimize step 3 by evaluating literals in 
> uniqueExprs immediately during codegen and only generating code for 
> evaluating non-literal expressions in step 3. In this case, the amount of 
> code generated will be roughly proportional (2 * O) (one O for step 3 
> assuming one expression per output column and another O for step 4).
> Performance:
> Some initial benchmarking suggests that this approach is ~25% slower than 
> switch cases when D is small (D <= 8). I am assuming that this is due to the 
> cost of additional array accesses as compared to switch cases. For D > 8, the 
> proposed approach is around 2x faster (this needs more analysis to understand 
> why the perf of switch cases falls drastically at D > 8). For D > 11, the 
> switch case approach fails entirely due to the JVM method size limit, but the 
> proposed approach continues to scale with performance proportional to 2^D.
> I wanted to get some thoughts on whether this approach is reasonable and is 
> something that can be added into Spark. We can choose to only use this 
> approach for cases with large number of projections. The code needs some 
> cleanup and more testing before I can raise a PR.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to