At 2015-01-22 02:06:37 -0800, NicolasC <nicolas.ch...@inria.fr> wrote: > I try to execute a simple program that runs the ShortestPaths algorithm > (org.apache.spark.graphx.lib.ShortestPaths) on a small grid graph. > I use Spark 1.2.0 downloaded from spark.apache.org. > > This program runs more than 2 hours when the grid size is 70x70 as above, and > is then killed > by the resource manager of the cluster (Torque). After a 5-6 minutes of > execution, the > Spark master UI does not even respond. > > For a grid size of 30x30, the program terminates in about 20 seconds, and for > a grid size > of 50x50 it finishes in about 80 seconds. The problem appears for a grid size > of 70x70 and > above.
Unfortunately this problem is due to a Spark bug. In later iterations of iterative algorithms, the lineage maintained for fault tolerance grows long and causes Spark to consume increasing amounts of resources for scheduling and task serialization. The workaround is to checkpoint the graph periodically, which writes it to stable storage and interrupts the lineage chain before it grows too long. If you're able to recompile Spark, you can do this by applying the patch to GraphX at the end of this mail, and before running graph algorithms, calling sc.setCheckpointDir("/tmp") to set the checkpoint directory as desired. Ankur === patch begins here === diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index 5e55620..1fbbb87 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -126,6 +126,8 @@ object Pregel extends Logging { // Loop var prevG: Graph[VD, ED] = null var i = 0 + val checkpoint = g.vertices.context.getCheckpointDir.nonEmpty + val checkpointFrequency = 25 while (activeMessages > 0 && i < maxIterations) { // Receive the messages. Vertices that didn't get any messages do not appear in newVerts. val newVerts = g.vertices.innerJoin(messages)(vprog).cache() @@ -139,6 +141,14 @@ object Pregel extends Logging { // get to send messages. We must cache messages so it can be materialized on the next line, // allowing us to uncache the previous iteration. messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDirection))).cache() + + if (checkpoint && i % checkpointFrequency == checkpointFrequency - 1) { + logInfo("Checkpointing in iteration " + i) + g.vertices.checkpoint() + g.edges.checkpoint() + messages.checkpoint() + } + // The call to count() materializes `messages`, `newVerts`, and the vertices of `g`. This // hides oldMessages (depended on by newVerts), newVerts (depended on by messages), and the // vertices of prevG (depended on by newVerts, oldMessages, and the vertices of g). --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org