[ https://issues.apache.org/jira/browse/SPARK-5674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-5674. --------------------------------- Resolution: Incomplete > Spark Job Explain Plan Proof of Concept > --------------------------------------- > > Key: SPARK-5674 > URL: https://issues.apache.org/jira/browse/SPARK-5674 > Project: Spark > Issue Type: Improvement > Components: Spark Core > Reporter: Kostas Sakellis > Priority: Major > Labels: bulk-closed > > This is just a prototype of creating an explain plan for a job. Code can be > found here: https://github.com/ksakellis/spark/tree/kostas-explainPlan-poc > The code was written very quickly and so doesn't have any comments, tests and > is probably buggy - hence it being a proof of concept. > *How to Use* > # {code}sc.explainOn <=> sc.explainOff{code} This will generate the explain > plain and print it in the logs > # {code}sc.enableExecution <=> sc.disableExecution{code} This will disable > executing of the job. > Using these two knobs a user can choose to print the explain plan and/or > disable the running of the job if they only want to see the plan. > *Implementation* > This is only a prototype and it is by no means production ready. The code is > pretty hacky in places and a few shortcuts were made just to get the > prototype working. > The most interesting part of this commit is in the ExecutionPlanner.scala > class. This class creates its own private instance of the DAGScheduler and > passes into it a NoopTaskScheduler. The NoopTaskScheduler receives the > created TaskSets from the DAGScheduler and records the stages -> tasksets. > The NoopTaskScheduler also creates fake CompletionsEvents and sends them to > the DAGScheduler to move the scheduling along. It is done this way so that we > can use the DAGScheduler unmodified thus reducing code divergence. > The rest of the code is about processing the information produced by the > ExecutionPlanner, creating a DAG with a bunch of metadata and printing it as > a pretty ascii drawing. For drawing the DAG, > https://github.com/mdr/ascii-graphs is used. This was just easier again to > prototype. > *How is this different than RDD#toDebugString?* > The execution planner runs the job through the entire DAGScheduler so we can > collect some metrics that are not presently available in the debugString. For > example, we can report the binary size of the task which might be important > if the closures are referencing large object. > In addition, because we execute the scheduler code from an action, we can get > a more accurate picture of where the stage boundaries and dependencies. An > action such ask treeReduce will generate a number of stages that you can't > get just by doing .toDebugString on the rdd. > *Limitations of this Implementation* > Because this is a prototype there are is a lot of lame stuff in this commit. > # All of the code in SparkContext in particular sucks. This adds some code in > the runJob() call and when it gets the plan it just writes it to the INFO > log. We need to find a better way of exposing the plan to the caller so that > they can print it, analyze it etc. Maybe we can use implicits or something? > Not sure how best to do this yet. > # Some of the actions will return through exceptions because we are basically > faking a runJob(). If you want ot try this, it is best to just use count() > instead of say collect(). This will get fixed when we fix 1) > # Because the ExplainPlanner creates its own DAGScheduler, there currently is > no way to map the real stages to the "explainPlan" stages. So if a user turns > on explain plan, and doesn't disable execution, we can't automatically add > more metrics to the explain plan as they become available. The stageId in the > plan and the stageId in the real scheduler will be different. This is > important for when we add it to the webUI and users can track progress on the > DAG > # We are using https://github.com/mdr/ascii-graphs to draw the DAG - not sure > if we want to depend on that project. > *Next Steps* > # It would be good to get a few people to take a look at the code > specifically at how the plan gets generated. Clone the package and give it a > try with some of your jobs > # If the approach looks okay overall, I can put together a mini design doc > and add some answers to the above limitations of this approach. > #Feedback most welcome. > *Example Code:* > {code} > sc.explainOn > sc.disableExecution > val rdd = sc.parallelize(1 to 10, 4).map(key => (key.toString, key)) > val rdd2 = sc.parallelize(1 to 5, 2).map(key => (key.toString, key)) > rdd.join(rdd2) > .count() > {code} > *Example Output:* > {noformat} > EXPLAIN PLAN: > +---------------+ +---------------+ > | | | | > |Stage: 0 @ map | |Stage: 1 @ map | > | Tasks: 4 | | Tasks: 2 | > | | | | > +---------------+ +---------------+ > | | > v v > +-----------------+ > | | > |Stage: 2 @ count | > | Tasks: 4 | > | | > +-----------------+ > STAGE DETAILS: > -------------- > Stage: 0 > Callsite: map at <console>:12 > ShuffleMapTask > PartitionId: 0 Type: ParallelCollectionPartition > Binary Size: 2.2 KB > ShuffleMapTask > PartitionId: 1 Type: ParallelCollectionPartition > Binary Size: 2.2 KB > ShuffleMapTask > PartitionId: 2 Type: ParallelCollectionPartition > Binary Size: 2.2 KB > ShuffleMapTask > PartitionId: 3 Type: ParallelCollectionPartition > Binary Size: 2.2 KB > RDD Chain: > +---------------------+ > | | > |RDD: 0 @ parallelize | > |ParallelCollectionRDD| > | | > +---------------------+ > | > v > +----------------+ > | | > | RDD: 1 @ map | > |MapPartitionsRDD| > | | > +----------------+ > Stage: 1 > Callsite: map at <console>:12 > ShuffleMapTask > PartitionId: 0 Type: ParallelCollectionPartition > Binary Size: 2.2 KB > ShuffleMapTask > PartitionId: 1 Type: ParallelCollectionPartition > Binary Size: 2.2 KB > RDD Chain: > +---------------------+ > | | > |RDD: 2 @ parallelize | > |ParallelCollectionRDD| > | | > +---------------------+ > | > v > +----------------+ > | | > | RDD: 3 @ map | > |MapPartitionsRDD| > | | > +----------------+ > Stage: 2 > Callsite: count at <console>:19 > ResultTask > PartitionId: 0 Type: CoGroupPartition > Binary Size: 2.4 KB > ResultTask > PartitionId: 1 Type: CoGroupPartition > Binary Size: 2.4 KB > ResultTask > PartitionId: 2 Type: CoGroupPartition > Binary Size: 2.4 KB > ResultTask > PartitionId: 3 Type: CoGroupPartition > Binary Size: 2.4 KB > RDD Chain: > +--------------+ > | | > |RDD: 4 @ join | > | CoGroupedRDD | > | | > +--------------+ > | > v > +----------------+ > | | > | RDD: 5 @ join | > |MapPartitionsRDD| > | | > +----------------+ > | > v > +----------------+ > | | > | RDD: 6 @ join | > |MapPartitionsRDD| > | | > +----------------+ > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org