[ https://issues.apache.org/jira/browse/SPARK-23032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-23032: ------------------------------------ Assignee: (was: Apache Spark) > Add a per-query codegenStageId to WholeStageCodegenExec > ------------------------------------------------------- > > Key: SPARK-23032 > URL: https://issues.apache.org/jira/browse/SPARK-23032 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 2.3.0 > Reporter: Kris Mok > > Proposing to add a per-query ID to the codegen stages as represented by > {{WholeStageCodegenExec}} operators. This ID will be used in > * the explain output of the physical plan, and in > * the generated class name. > Specifically, this ID will be stable within a query, counting up from 1 in > depth-first post-order for all the {{WholeStageCodegenExec}} inserted into a > plan. > The ID value 0 is reserved for "free-floating" {{WholeStageCodegenExec}} > objects, which may have been created for one-off purposes, e.g. for fallback > handling of codegen stages that failed to codegen the whole stage and wishes > to codegen a subset of the children operators. > Example: for the following query: > {code:none} > scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1) > scala> val df1 = spark.range(10).select('id as 'x, 'id + 1 as > 'y).orderBy('x).select('x + 1 as 'z, 'y) > df1: org.apache.spark.sql.DataFrame = [z: bigint, y: bigint] > scala> val df2 = spark.range(5) > df2: org.apache.spark.sql.Dataset[Long] = [id: bigint] > scala> val query = df1.join(df2, 'z === 'id) > query: org.apache.spark.sql.DataFrame = [z: bigint, y: bigint ... 1 more > field] > {code} > The explain output before the change is: > {code:none} > scala> query.explain > == Physical Plan == > *SortMergeJoin [z#9L], [id#13L], Inner > :- *Sort [z#9L ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(z#9L, 200) > : +- *Project [(x#3L + 1) AS z#9L, y#4L] > : +- *Sort [x#3L ASC NULLS FIRST], true, 0 > : +- Exchange rangepartitioning(x#3L ASC NULLS FIRST, 200) > : +- *Project [id#0L AS x#3L, (id#0L + 1) AS y#4L] > : +- *Range (0, 10, step=1, splits=8) > +- *Sort [id#13L ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(id#13L, 200) > +- *Range (0, 5, step=1, splits=8) > {code} > Note how codegen'd operators are annotated with a prefix {{"*"}}. > and after this change it'll be: > {code:none} > scala> query.explain > == Physical Plan == > *(6) SortMergeJoin [z#9L], [id#13L], Inner > :- *(3) Sort [z#9L ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(z#9L, 200) > : +- *(2) Project [(x#3L + 1) AS z#9L, y#4L] > : +- *(2) Sort [x#3L ASC NULLS FIRST], true, 0 > : +- Exchange rangepartitioning(x#3L ASC NULLS FIRST, 200) > : +- *(1) Project [id#0L AS x#3L, (id#0L + 1) AS y#4L] > : +- *(1) Range (0, 10, step=1, splits=8) > +- *(5) Sort [id#13L ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(id#13L, 200) > +- *(4) Range (0, 5, step=1, splits=8) > {code} > Note that the annotated prefix becomes {{"*(id) "}} > It'll also show up in the name of the generated class, as a suffix in the > format of > {code:none} > GeneratedClass$GeneratedIterator$id > {code} > for example, note how {{GeneratedClass$GeneratedIterator$3}} and > {{GeneratedClass$GeneratedIterator$6}} in the following stack trace > corresponds to the IDs shown in the explain output above: > {code:none} > "Executor task launch worker for task 424@12957" daemon prio=5 tid=0x58 > nid=NA runnable > java.lang.Thread.State: RUNNABLE > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:109) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$3.sort_addToSorter$(generated.java:32) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$3.processNext(generated.java:41) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$9$$anon$1.hasNext(WholeStageCodegenExec.scala:494) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$6.findNextInnerJoinRows$(generated.java:42) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$6.processNext(generated.java:101) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$2.hasNext(WholeStageCodegenExec.scala:513) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:828) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:828) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > {code} > Rationale: > Right now, the codegen from Spark SQL lacks the means to differentiate > between a couple of things: > 1. It's hard to tell which physical operators are in the same > WholeStageCodegen stage. Note that this "stage" is a separate notion from > Spark's RDD execution stages; this one is only to delineate codegen units. > There can be adjacent physical operators that are both codegen'd but are in > separate codegen stages. Some of this is due to hacky implementation details, > such as the case with SortMergeJoin and its Sort inputs -- they're hard coded > to be split into separate stages although both are codegen'd. > When printing out the explain output of the physical plan, you'd only see the > codegen'd physical operators annotated with a preceding star ('*') but would > have no way to figure out if they're in the same stage. > 2. Performance/error diagnosis > The generated code has class/method names that are hard to differentiate > between queries or even between codegen stages within the same query. If we > use a Java-level profiler to collect profiles, or if we encounter a > Java-level exception with a stack trace in it, it's really hard to tell which > part of a query it's at. > By introducing a per-query codegen stage ID, we'd at least be able to know > which codegen stage (and in turn, which group of physical operators) was a > profile tick or an exception happened. > The reason why this proposal uses a per-query ID is because it's stable > within a query, so that multiple runs of the same query will see the same > resulting IDs. This both benefits understandability for users, and also it > plays well with the codegen cache in Spark SQL which uses the generated > source code as the key. > The downside to using per-query IDs as opposed to a per-session or globally > incrementing ID is of course we can't tell apart different query runs with > this ID alone. But for now I believe this is a good enough tradeoff. -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org