Kostas Sakellis created SPARK-5674: -------------------------------------- Summary: Spark Job Explain Plan Proof of Concept Key: SPARK-5674 URL: https://issues.apache.org/jira/browse/SPARK-5674 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Kostas Sakellis
This is just a prototype of creating an explain plan for a job. Code can be found here: https://github.com/ksakellis/spark/tree/kostas-explainPlan-poc The code was written very quickly and so doesn't have any comments, tests and is probably buggy - hence it being a proof of concept. *How to Use* # {code}sc.explainOn <=> sc.explainOff{code} This will generate the explain plain and print it in the logs # {code}sc.enableExecution <=> sc.disableExecution{code} This will disable executing of the job. Using these two knobs a user can choose to print the explain plan and/or disable the running of the job if they only want to see the plan. *Implementation* This is only a prototype and it is by no means production ready. The code is pretty hacky in places and a few shortcuts were made just to get the prototype working. The most interesting part of this commit is in the ExecutionPlanner.scala class. This class creates its own private instance of the DAGScheduler and passes into it a NoopTaskScheduler. The NoopTaskScheduler receives the created TaskSets from the DAGScheduler and records the stages -> tasksets. The NoopTaskScheduler also creates fake CompletionsEvents and sends them to the DAGScheduler to move the scheduling along. It is done this way so that we can use the DAGScheduler unmodified thus reducing code divergence. The rest of the code is about processing the information produced by the ExecutionPlanner, creating a DAG with a bunch of metadata and printing it as a pretty ascii drawing. For drawing the DAG, https://github.com/mdr/ascii-graphs is used. This was just easier again to prototype. *How is this different than RDD#toDebugString?* The execution planner runs the job through the entire DAGScheduler so we can collect some metrics that are not presently available in the debugString. For example, we can report the binary size of the task which might be important if the closures are referencing large object. In addition, because we execute the scheduler code from an action, we can get a more accurate picture of where the stage boundaries and dependencies. An action such ask treeReduce will generate a number of stages that you can't get just by doing .toDebugString on the rdd. *Limitations of this Implementation* Because this is a prototype there are is a lot of lame stuff in this commit. # All of the code in SparkContext in particular sucks. This adds some code in the runJob() call and when it gets the plan it just writes it to the INFO log. We need to find a better way of exposing the plan to the caller so that they can print it, analyze it etc. Maybe we can use implicits or something? Not sure how best to do this yet. # Some of the actions will return through exceptions because we are basically faking a runJob(). If you want ot try this, it is best to just use count() instead of say collect(). This will get fixed when we fix 1) # Because the ExplainPlanner creates its own DAGScheduler, there currently is no way to map the real stages to the "explainPlan" stages. So if a user turns on explain plan, and doesn't disable execution, we can't automatically add more metrics to the explain plan as they become available. The stageId in the plan and the stageId in the real scheduler will be different. This is important for when we add it to the webUI and users can track progress on the DAG # We are using https://github.com/mdr/ascii-graphs to draw the DAG - not sure if we want to depend on that project. *Next Steps* # It would be good to get a few people to take a look at the code specifically at how the plan gets generated. Clone the package and give it a try with some of your jobs # If the approach looks okay overall, I can put together a mini design doc and add some answers to the above limitations of this approach. #Feedback most welcome. *Example Code:* {code} sc.explainOn sc.disableExecution val rdd = sc.parallelize(1 to 10, 4).map(key => (key.toString, key)) val rdd2 = sc.parallelize(1 to 5, 2).map(key => (key.toString, key)) rdd.join(rdd2) .count() {code} *Example Output:* {noformat} EXPLAIN PLAN: +---------------+ +---------------+ | | | | |Stage: 0 @ map | |Stage: 1 @ map | | Tasks: 4 | | Tasks: 2 | | | | | +---------------+ +---------------+ | | v v +-----------------+ | | |Stage: 2 @ count | | Tasks: 4 | | | +-----------------+ STAGE DETAILS: -------------- Stage: 0 Callsite: map at <console>:12 ShuffleMapTask PartitionId: 0 Type: ParallelCollectionPartition Binary Size: 2.2 KB ShuffleMapTask PartitionId: 1 Type: ParallelCollectionPartition Binary Size: 2.2 KB ShuffleMapTask PartitionId: 2 Type: ParallelCollectionPartition Binary Size: 2.2 KB ShuffleMapTask PartitionId: 3 Type: ParallelCollectionPartition Binary Size: 2.2 KB RDD Chain: +---------------------+ | | |RDD: 0 @ parallelize | |ParallelCollectionRDD| | | +---------------------+ | v +----------------+ | | | RDD: 1 @ map | |MapPartitionsRDD| | | +----------------+ Stage: 1 Callsite: map at <console>:12 ShuffleMapTask PartitionId: 0 Type: ParallelCollectionPartition Binary Size: 2.2 KB ShuffleMapTask PartitionId: 1 Type: ParallelCollectionPartition Binary Size: 2.2 KB RDD Chain: +---------------------+ | | |RDD: 2 @ parallelize | |ParallelCollectionRDD| | | +---------------------+ | v +----------------+ | | | RDD: 3 @ map | |MapPartitionsRDD| | | +----------------+ Stage: 2 Callsite: count at <console>:19 ResultTask PartitionId: 0 Type: CoGroupPartition Binary Size: 2.4 KB ResultTask PartitionId: 1 Type: CoGroupPartition Binary Size: 2.4 KB ResultTask PartitionId: 2 Type: CoGroupPartition Binary Size: 2.4 KB ResultTask PartitionId: 3 Type: CoGroupPartition Binary Size: 2.4 KB RDD Chain: +--------------+ | | |RDD: 4 @ join | | CoGroupedRDD | | | +--------------+ | v +----------------+ | | | RDD: 5 @ join | |MapPartitionsRDD| | | +----------------+ | v +----------------+ | | | RDD: 6 @ join | |MapPartitionsRDD| | | +----------------+ {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org