[ https://issues.apache.org/jira/browse/SPARK-51582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17937386#comment-17937386 ]
Shardul Mahadik commented on SPARK-51582: ----------------------------------------- cc: [~maropu] [~kiszk] [~viirya] Would be great to hear your thoughts since you had worked on/reviewed Expand codegen before. Also cc: [~cloud_fan] since this is SQL related. > Improve codegen for ExpandExec with high number of projections > -------------------------------------------------------------- > > Key: SPARK-51582 > URL: https://issues.apache.org/jira/browse/SPARK-51582 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 3.5.5 > Reporter: Shardul Mahadik > Priority: Major > > Currently, ExpandExec uses a [switch-case based > codegen|https://github.com/apache/spark/blob/7ce5d3294325de6ef5d8d07d611e92975092d31f/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala#L118] > mechanism where the amount of code generated is proportional to (N * O) > where N is the number of rows produced per input row and O is the number of > output columns. For a cubing operation (or other similar grouping sets > operations), which seems to be the main use case for Expand, the number of > output rows per input row N = 2^D, where D is the number of cubing > dimensions. We find that the current codegen for ExpandExec fails for as low > as D = 10, because the generated code hits the 64KB JVM limit for bytecode > per method. SPARK-35329 tries to improve on this by moving the contents of > each case block to its own method, but the code within the _consume_ method > is still proportional to N (which can be much much higher than O) and fails > for as low as D = 12. This unfortunately is not sufficient for some for our > use cases where D can go as high as 16. > Instead of generating switch-cases, we can do the following: > # Find all the unique individual expressions across [the > projections|https://github.com/apache/spark/blob/7ce5d3294325de6ef5d8d07d611e92975092d31f/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala#L37]. > For most use cases, this number is much smaller than (N * O). E.g. For the > cubing operation, the number of individual expressions is (N + O), O since 1 > expr per every output column (dimension and values) and N since Spark assigns > literals 1…N to each row of the Expand output. Consider > {code:java} > projections = [ [k1, k2, k3, val1, 1], > [null, k2, k3, val1, 2], > [k1, null, k3, val1, 3], > . > . > [null, null, null, val1, 8] ] > {code} > then > {code:java} > uniqueExprs = [null, k1, k2, k3, val1, 1, 2, … N] > {code} > # Create an expression map which follows the same structure as projections, > but replaces expressions with their index in uniqueExprs > {code:java} > exprMap = [ [1, 2, 3, 4, 5], > [0, 2, 3, 4, 6], > [1, 0, 3, 4, 7], > . > . > [0, 0, 0, 4, 12] ] > {code} > # Calculate the value of all the unique expressions once per input row. > {code:java} > exprOutputs = [ eval(null), eval(k1), eval(k2), ….. ] > {code} > # For every projection, for every output column, assign the value by looking > up the corresponding value in exprOutputs using the index from exprMap > Pseudocode: > {code:java} > val uniqueExprs = distinct on projections.flatten > val exprMap = for each expr in projections, find its index in uniqueExprs > do_consume(): > val exprOutputs = evaluate all exprs in uniqueExprs on input row > for (i : projections.indices) { > // find value of output column by fetching the index from > exprMap > // and using the index to offset into exprOutputs > index = exprMap[i][0] > output[0] = exprOutputs[index] > index = exprMap[i][1] > output[1] = exprOutputs[index] > . > . > . > } > {code} > With this approach, steps 1 and 2 are performed during codegen and their > outputs are added to reference objects that can be accessed during code > execution. Also, we can further optimize step 3 by evaluating literals in > uniqueExprs immediately during codegen and only generating code for > evaluating non-literal expressions in step 3. In this case, the amount of > code generated will be roughly proportional (2 * O) (one O for step 3 > assuming one expression per output column and another O for step 4). > Performance: > Some initial benchmarking suggests that this approach is ~25% slower than > switch cases when D is small (D <= 8). I am assuming that this is due to the > cost of additional array accesses as compared to switch cases. For D > 8, the > proposed approach is around 2x faster (this needs more analysis to understand > why the perf of switch cases falls drastically at D > 8). For D > 11, the > switch case approach fails entirely due to the JVM method size limit, but the > proposed approach continues to scale with performance proportional to 2^D. > I wanted to get some thoughts on whether this approach is reasonable and is > something that can be added into Spark. We can choose to only use this > approach for cases with large number of projections. The code needs some > cleanup and more testing before I can raise a PR. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org