Kostas Sakellis created SPARK-5674:
--------------------------------------

             Summary: Spark Job Explain Plan Proof of Concept
                 Key: SPARK-5674
                 URL: https://issues.apache.org/jira/browse/SPARK-5674
             Project: Spark
          Issue Type: Improvement
          Components: Spark Core
            Reporter: Kostas Sakellis


This is just a prototype of creating an explain plan for a job. Code can be 
found here: https://github.com/ksakellis/spark/tree/kostas-explainPlan-poc The 
code was written very quickly and so doesn't have any comments, tests and is 
probably buggy - hence it being a proof of concept.

*How to Use*

# {code}sc.explainOn <=> sc.explainOff{code} This will generate the explain 
plain and print it in the logs
# {code}sc.enableExecution <=> sc.disableExecution{code} This will disable 
executing of the job.

Using these two knobs a user can choose to print the explain plan and/or 
disable the running of the job if they only want to see the plan.


*Implementation*

This is only a prototype and it is by no means production ready. The code is 
pretty hacky in places and a few shortcuts were made just to get the prototype 
working.

The most interesting part of this commit is in the ExecutionPlanner.scala 
class. This class creates its own private instance of the DAGScheduler and 
passes into it a NoopTaskScheduler. The NoopTaskScheduler receives the created 
TaskSets from the DAGScheduler and records the stages -> tasksets. The 
NoopTaskScheduler also creates fake CompletionsEvents and sends them to the 
DAGScheduler to move the scheduling along. It is done this way so that we can 
use the DAGScheduler unmodified thus reducing code divergence.

The rest of the code is about processing the information produced by the 
ExecutionPlanner, creating a DAG with a bunch of metadata and printing it as a 
pretty ascii drawing. For drawing the DAG, https://github.com/mdr/ascii-graphs 
is used. This was just easier again to prototype.

*How is this different than RDD#toDebugString?*

The execution planner runs the job through the entire DAGScheduler so we can 
collect some metrics that are not presently available in the debugString. For 
example, we can report the binary size of the task which might be important if 
the closures are referencing large object.

In addition, because we execute the scheduler code from an action, we can get a 
more accurate picture of where the stage boundaries and dependencies. An action 
such ask treeReduce will generate a number of stages that you can't get just by 
doing .toDebugString on the rdd.


*Limitations of this Implementation*

Because this is a prototype there are is a lot of lame stuff in this commit.

# All of the code in SparkContext in particular sucks. This adds some code in 
the runJob() call and when it gets the plan it just writes it to the INFO log. 
We need to find a better way of exposing the plan to the caller so that they 
can print it, analyze it etc. Maybe we can use implicits or something? Not sure 
how best to do this yet.
# Some of the actions will return through exceptions because we are basically 
faking a runJob(). If you want ot try this, it is best to just use count() 
instead of say collect(). This will get fixed when we fix 1)
# Because the ExplainPlanner creates its own DAGScheduler, there currently is 
no way to map the real stages to the "explainPlan" stages. So if a user turns 
on explain plan, and doesn't disable execution, we can't automatically add more 
metrics to the explain plan as they become available. The stageId in the plan 
and the stageId in the real scheduler will be different. This is important for 
when we add it to the webUI and users can track progress on the DAG
# We are using https://github.com/mdr/ascii-graphs to draw the DAG - not sure 
if we want to depend on that project.

*Next Steps*

# It would be good to get a few people to take a look at the code specifically 
at how the plan gets generated. Clone the package and give it a try with some 
of your jobs
# If the approach looks okay overall, I can put together a mini design doc and 
add some answers to the above limitations of this approach. 
#Feedback most welcome.

*Example Code:*

{code}
sc.explainOn
sc.disableExecution

val rdd = sc.parallelize(1 to 10, 4).map(key => (key.toString, key))
val rdd2 = sc.parallelize(1 to 5, 2).map(key => (key.toString, key))
rdd.join(rdd2)
   .count()
{code}


*Example Output:*

{noformat}
EXPLAIN PLAN:

 +---------------+ +---------------+
 |               | |               |
 |Stage: 0 @ map | |Stage: 1 @ map |
 |   Tasks: 4    | |   Tasks: 2    |
 |               | |               |
 +---------------+ +---------------+
               |     |
               v     v
         +-----------------+
         |                 |
         |Stage: 2 @ count |
         |    Tasks: 4     |
         |                 |
         +-----------------+

 STAGE DETAILS:
 --------------

 Stage: 0
  Callsite: map at <console>:12

  ShuffleMapTask
   PartitionId: 0 Type: ParallelCollectionPartition
   Binary Size: 2.2 KB

  ShuffleMapTask
   PartitionId: 1 Type: ParallelCollectionPartition
   Binary Size: 2.2 KB

  ShuffleMapTask
   PartitionId: 2 Type: ParallelCollectionPartition
   Binary Size: 2.2 KB

  ShuffleMapTask
   PartitionId: 3 Type: ParallelCollectionPartition
   Binary Size: 2.2 KB

  RDD Chain:
 +---------------------+
 |                     |
 |RDD: 0 @ parallelize |
 |ParallelCollectionRDD|
 |                     |
 +---------------------+
            |
            v
   +----------------+
   |                |
   | RDD: 1 @ map   |
   |MapPartitionsRDD|
   |                |
   +----------------+

 Stage: 1
  Callsite: map at <console>:12

  ShuffleMapTask
   PartitionId: 0 Type: ParallelCollectionPartition
   Binary Size: 2.2 KB

  ShuffleMapTask
   PartitionId: 1 Type: ParallelCollectionPartition
   Binary Size: 2.2 KB

  RDD Chain:
 +---------------------+
 |                     |
 |RDD: 2 @ parallelize |
 |ParallelCollectionRDD|
 |                     |
 +---------------------+
            |
            v
   +----------------+
   |                |
   | RDD: 3 @ map   |
   |MapPartitionsRDD|
   |                |
   +----------------+

 Stage: 2
  Callsite: count at <console>:19

  ResultTask
   PartitionId: 0 Type: CoGroupPartition
   Binary Size: 2.4 KB

  ResultTask
   PartitionId: 1 Type: CoGroupPartition
   Binary Size: 2.4 KB

  ResultTask
   PartitionId: 2 Type: CoGroupPartition
   Binary Size: 2.4 KB

  ResultTask
   PartitionId: 3 Type: CoGroupPartition
   Binary Size: 2.4 KB

  RDD Chain:
  +--------------+
  |              |
  |RDD: 4 @ join |
  | CoGroupedRDD |
  |              |
  +--------------+
          |
          v
 +----------------+
 |                |
 | RDD: 5 @ join  |
 |MapPartitionsRDD|
 |                |
 +----------------+
          |
          v
 +----------------+
 |                |
 | RDD: 6 @ join  |
 |MapPartitionsRDD|
 |                |
 +----------------+
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to