Kris Mok created SPARK-23032:
--------------------------------

             Summary: Add a per-query codegenStageId to WholeStageCodegenExec
                 Key: SPARK-23032
                 URL: https://issues.apache.org/jira/browse/SPARK-23032
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 2.3.0
            Reporter: Kris Mok


Proposing to add a per-query ID to the codegen stages as represented by 
{{WholeStageCodegenExec}} operators. This ID will be used in
* the explain output of the physical plan, and in
* the generated class name.

Specifically, this ID will be stable within a query, counting up from 1 in 
depth-first post-order for all the {{WholeStageCodegenExec}} inserted into a 
plan.
The ID value 0 is reserved for "free-floating" {{WholeStageCodegenExec}} 
objects, which may have been created for one-off purposes, e.g. for fallback 
handling of codegen stages that failed to codegen the whole stage and wishes to 
codegen a subset of the children operators.

Example: for the following query:
{code:none}
scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1)

scala> val df1 = spark.range(10).select('id as 'x, 'id + 1 as 
'y).orderBy('x).select('x + 1 as 'z, 'y)
df1: org.apache.spark.sql.DataFrame = [z: bigint, y: bigint]

scala> val df2 = spark.range(5)
df2: org.apache.spark.sql.Dataset[Long] = [id: bigint]

scala> val query = df1.join(df2, 'z === 'id)
query: org.apache.spark.sql.DataFrame = [z: bigint, y: bigint ... 1 more field]
{code}

The explain output before the change is:
{code:none}
scala> query.explain
== Physical Plan ==
*SortMergeJoin [z#9L], [id#13L], Inner
:- *Sort [z#9L ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(z#9L, 200)
:     +- *Project [(x#3L + 1) AS z#9L, y#4L]
:        +- *Sort [x#3L ASC NULLS FIRST], true, 0
:           +- Exchange rangepartitioning(x#3L ASC NULLS FIRST, 200)
:              +- *Project [id#0L AS x#3L, (id#0L + 1) AS y#4L]
:                 +- *Range (0, 10, step=1, splits=8)
+- *Sort [id#13L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#13L, 200)
      +- *Range (0, 5, step=1, splits=8)
{code}
Note how codegen'd operators are annotated with a prefix {{"*"}}.

and after this change it'll be:
{code:none}
scala> query.explain
== Physical Plan ==
*(6) SortMergeJoin [z#9L], [id#13L], Inner
:- *(3) Sort [z#9L ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(z#9L, 200)
:     +- *(2) Project [(x#3L + 1) AS z#9L, y#4L]
:        +- *(2) Sort [x#3L ASC NULLS FIRST], true, 0
:           +- Exchange rangepartitioning(x#3L ASC NULLS FIRST, 200)
:              +- *(1) Project [id#0L AS x#3L, (id#0L + 1) AS y#4L]
:                 +- *(1) Range (0, 10, step=1, splits=8)
+- *(5) Sort [id#13L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#13L, 200)
      +- *(4) Range (0, 5, step=1, splits=8)
{code}
Note that the annotated prefix becomes {{"*(id) "}}

It'll also show up in the name of the generated class, as a suffix in the 
format of
{code:none}
GeneratedClass$GeneratedIterator$id
{code}

for example, note how {{GeneratedClass$GeneratedIterator$3}} and 
{{GeneratedClass$GeneratedIterator$6}} in the following stack trace corresponds 
to the IDs shown in the explain output above:
{code:none}
"Executor task launch worker for task 424@12957" daemon prio=5 tid=0x58 nid=NA 
runnable
  java.lang.Thread.State: RUNNABLE
          at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:109)
          at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$3.sort_addToSorter$(generated.java:32)
          at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$3.processNext(generated.java:41)
          at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
          at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$9$$anon$1.hasNext(WholeStageCodegenExec.scala:494)
          at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$6.findNextInnerJoinRows$(generated.java:42)
          at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator$6.processNext(generated.java:101)
          at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
          at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$2.hasNext(WholeStageCodegenExec.scala:513)
          at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
          at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
          at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:828)
          at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:828)
          at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
          at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
          at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
          at org.apache.spark.scheduler.Task.run(Task.scala:109)
          at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
          at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
          at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
          at java.lang.Thread.run(Thread.java:748)
{code}

Rationale:

Right now, the codegen from Spark SQL lacks the means to differentiate between 
a couple of things:

1. It's hard to tell which physical operators are in the same WholeStageCodegen 
stage. Note that this "stage" is a separate notion from Spark's RDD execution 
stages; this one is only to delineate codegen units.
There can be adjacent physical operators that are both codegen'd but are in 
separate codegen stages. Some of this is due to hacky implementation details, 
such as the case with SortMergeJoin and its Sort inputs -- they're hard coded 
to be split into separate stages although both are codegen'd.
When printing out the explain output of the physical plan, you'd only see the 
codegen'd physical operators annotated with a preceding star ('*') but would 
have no way to figure out if they're in the same stage.

2. Performance/error diagnosis
The generated code has class/method names that are hard to differentiate 
between queries or even between codegen stages within the same query. If we use 
a Java-level profiler to collect profiles, or if we encounter a Java-level 
exception with a stack trace in it, it's really hard to tell which part of a 
query it's at.
By introducing a per-query codegen stage ID, we'd at least be able to know 
which codegen stage (and in turn, which group of physical operators) was a 
profile tick or an exception happened.

The reason why this proposal uses a per-query ID is because it's stable within 
a query, so that multiple runs of the same query will see the same resulting 
IDs. This both benefits understandability for users, and also it plays well 
with the codegen cache in Spark SQL which uses the generated source code as the 
key.

The downside to using per-query IDs as opposed to a per-session or globally 
incrementing ID is of course we can't tell apart different query runs with this 
ID alone. But for now I believe this is a good enough tradeoff.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to