At 2015-01-22 02:06:37 -0800, NicolasC <nicolas.ch...@inria.fr> wrote:
> I try to execute a simple program that runs the ShortestPaths algorithm
> (org.apache.spark.graphx.lib.ShortestPaths) on a small grid graph.
> I use Spark 1.2.0 downloaded from spark.apache.org.
>
> This program runs more than 2 hours when the grid size is 70x70 as above, and 
> is then killed
> by the resource manager of the cluster (Torque). After a 5-6 minutes of 
> execution, the
> Spark master UI does not even respond.
>
> For a grid size of 30x30, the program terminates in about 20 seconds, and for 
> a grid size
> of 50x50 it finishes in about 80 seconds. The problem appears for a grid size 
> of 70x70 and
> above.

Unfortunately this problem is due to a Spark bug. In later iterations of 
iterative algorithms, the lineage maintained for fault tolerance grows long and 
causes Spark to consume increasing amounts of resources for scheduling and task 
serialization.

The workaround is to checkpoint the graph periodically, which writes it to 
stable storage and interrupts the lineage chain before it grows too long.

If you're able to recompile Spark, you can do this by applying the patch to 
GraphX at the end of this mail, and before running graph algorithms, calling

    sc.setCheckpointDir("/tmp")

to set the checkpoint directory as desired.

Ankur

=== patch begins here ===

diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
index 5e55620..1fbbb87 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
@@ -126,6 +126,8 @@ object Pregel extends Logging {
     // Loop
     var prevG: Graph[VD, ED] = null
     var i = 0
+    val checkpoint = g.vertices.context.getCheckpointDir.nonEmpty
+    val checkpointFrequency = 25
     while (activeMessages > 0 && i < maxIterations) {
       // Receive the messages. Vertices that didn't get any messages do not 
appear in newVerts.
       val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
@@ -139,6 +141,14 @@ object Pregel extends Logging {
       // get to send messages. We must cache messages so it can be 
materialized on the next line,
       // allowing us to uncache the previous iteration.
       messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, 
activeDirection))).cache()
+
+      if (checkpoint && i % checkpointFrequency == checkpointFrequency - 1) {
+        logInfo("Checkpointing in iteration " + i)
+        g.vertices.checkpoint()
+        g.edges.checkpoint()
+        messages.checkpoint()
+      }
+
       // The call to count() materializes `messages`, `newVerts`, and the 
vertices of `g`. This
       // hides oldMessages (depended on by newVerts), newVerts (depended on by 
messages), and the
       // vertices of prevG (depended on by newVerts, oldMessages, and the 
vertices of g).

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to