[ 
https://issues.apache.org/jira/browse/SPARK-5674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-5674.
---------------------------------
    Resolution: Incomplete

> Spark Job Explain Plan Proof of Concept
> ---------------------------------------
>
>                 Key: SPARK-5674
>                 URL: https://issues.apache.org/jira/browse/SPARK-5674
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>            Reporter: Kostas Sakellis
>            Priority: Major
>              Labels: bulk-closed
>
> This is just a prototype of creating an explain plan for a job. Code can be 
> found here: https://github.com/ksakellis/spark/tree/kostas-explainPlan-poc 
> The code was written very quickly and so doesn't have any comments, tests and 
> is probably buggy - hence it being a proof of concept.
> *How to Use*
> # {code}sc.explainOn <=> sc.explainOff{code} This will generate the explain 
> plain and print it in the logs
> # {code}sc.enableExecution <=> sc.disableExecution{code} This will disable 
> executing of the job.
> Using these two knobs a user can choose to print the explain plan and/or 
> disable the running of the job if they only want to see the plan.
> *Implementation*
> This is only a prototype and it is by no means production ready. The code is 
> pretty hacky in places and a few shortcuts were made just to get the 
> prototype working.
> The most interesting part of this commit is in the ExecutionPlanner.scala 
> class. This class creates its own private instance of the DAGScheduler and 
> passes into it a NoopTaskScheduler. The NoopTaskScheduler receives the 
> created TaskSets from the DAGScheduler and records the stages -> tasksets. 
> The NoopTaskScheduler also creates fake CompletionsEvents and sends them to 
> the DAGScheduler to move the scheduling along. It is done this way so that we 
> can use the DAGScheduler unmodified thus reducing code divergence.
> The rest of the code is about processing the information produced by the 
> ExecutionPlanner, creating a DAG with a bunch of metadata and printing it as 
> a pretty ascii drawing. For drawing the DAG, 
> https://github.com/mdr/ascii-graphs is used. This was just easier again to 
> prototype.
> *How is this different than RDD#toDebugString?*
> The execution planner runs the job through the entire DAGScheduler so we can 
> collect some metrics that are not presently available in the debugString. For 
> example, we can report the binary size of the task which might be important 
> if the closures are referencing large object.
> In addition, because we execute the scheduler code from an action, we can get 
> a more accurate picture of where the stage boundaries and dependencies. An 
> action such ask treeReduce will generate a number of stages that you can't 
> get just by doing .toDebugString on the rdd.
> *Limitations of this Implementation*
> Because this is a prototype there are is a lot of lame stuff in this commit.
> # All of the code in SparkContext in particular sucks. This adds some code in 
> the runJob() call and when it gets the plan it just writes it to the INFO 
> log. We need to find a better way of exposing the plan to the caller so that 
> they can print it, analyze it etc. Maybe we can use implicits or something? 
> Not sure how best to do this yet.
> # Some of the actions will return through exceptions because we are basically 
> faking a runJob(). If you want ot try this, it is best to just use count() 
> instead of say collect(). This will get fixed when we fix 1)
> # Because the ExplainPlanner creates its own DAGScheduler, there currently is 
> no way to map the real stages to the "explainPlan" stages. So if a user turns 
> on explain plan, and doesn't disable execution, we can't automatically add 
> more metrics to the explain plan as they become available. The stageId in the 
> plan and the stageId in the real scheduler will be different. This is 
> important for when we add it to the webUI and users can track progress on the 
> DAG
> # We are using https://github.com/mdr/ascii-graphs to draw the DAG - not sure 
> if we want to depend on that project.
> *Next Steps*
> # It would be good to get a few people to take a look at the code 
> specifically at how the plan gets generated. Clone the package and give it a 
> try with some of your jobs
> # If the approach looks okay overall, I can put together a mini design doc 
> and add some answers to the above limitations of this approach. 
> #Feedback most welcome.
> *Example Code:*
> {code}
> sc.explainOn
> sc.disableExecution
> val rdd = sc.parallelize(1 to 10, 4).map(key => (key.toString, key))
> val rdd2 = sc.parallelize(1 to 5, 2).map(key => (key.toString, key))
> rdd.join(rdd2)
>    .count()
> {code}
> *Example Output:*
> {noformat}
> EXPLAIN PLAN:
>  +---------------+ +---------------+
>  |               | |               |
>  |Stage: 0 @ map | |Stage: 1 @ map |
>  |   Tasks: 4    | |   Tasks: 2    |
>  |               | |               |
>  +---------------+ +---------------+
>                |     |
>                v     v
>          +-----------------+
>          |                 |
>          |Stage: 2 @ count |
>          |    Tasks: 4     |
>          |                 |
>          +-----------------+
>  STAGE DETAILS:
>  --------------
>  Stage: 0
>   Callsite: map at <console>:12
>   ShuffleMapTask
>    PartitionId: 0 Type: ParallelCollectionPartition
>    Binary Size: 2.2 KB
>   ShuffleMapTask
>    PartitionId: 1 Type: ParallelCollectionPartition
>    Binary Size: 2.2 KB
>   ShuffleMapTask
>    PartitionId: 2 Type: ParallelCollectionPartition
>    Binary Size: 2.2 KB
>   ShuffleMapTask
>    PartitionId: 3 Type: ParallelCollectionPartition
>    Binary Size: 2.2 KB
>   RDD Chain:
>  +---------------------+
>  |                     |
>  |RDD: 0 @ parallelize |
>  |ParallelCollectionRDD|
>  |                     |
>  +---------------------+
>             |
>             v
>    +----------------+
>    |                |
>    | RDD: 1 @ map   |
>    |MapPartitionsRDD|
>    |                |
>    +----------------+
>  Stage: 1
>   Callsite: map at <console>:12
>   ShuffleMapTask
>    PartitionId: 0 Type: ParallelCollectionPartition
>    Binary Size: 2.2 KB
>   ShuffleMapTask
>    PartitionId: 1 Type: ParallelCollectionPartition
>    Binary Size: 2.2 KB
>   RDD Chain:
>  +---------------------+
>  |                     |
>  |RDD: 2 @ parallelize |
>  |ParallelCollectionRDD|
>  |                     |
>  +---------------------+
>             |
>             v
>    +----------------+
>    |                |
>    | RDD: 3 @ map   |
>    |MapPartitionsRDD|
>    |                |
>    +----------------+
>  Stage: 2
>   Callsite: count at <console>:19
>   ResultTask
>    PartitionId: 0 Type: CoGroupPartition
>    Binary Size: 2.4 KB
>   ResultTask
>    PartitionId: 1 Type: CoGroupPartition
>    Binary Size: 2.4 KB
>   ResultTask
>    PartitionId: 2 Type: CoGroupPartition
>    Binary Size: 2.4 KB
>   ResultTask
>    PartitionId: 3 Type: CoGroupPartition
>    Binary Size: 2.4 KB
>   RDD Chain:
>   +--------------+
>   |              |
>   |RDD: 4 @ join |
>   | CoGroupedRDD |
>   |              |
>   +--------------+
>           |
>           v
>  +----------------+
>  |                |
>  | RDD: 5 @ join  |
>  |MapPartitionsRDD|
>  |                |
>  +----------------+
>           |
>           v
>  +----------------+
>  |                |
>  | RDD: 6 @ join  |
>  |MapPartitionsRDD|
>  |                |
>  +----------------+
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to