Hey guys,

I’m trying to run connected components on graphs that end up running for a 
fairly large number of iterations (25-30) and take 5-6 hours. I find more than 
half the time I end up getting fetch failures and losing an executor after a 
number of iterations. Then it has to go back and recompute pieces that it lost, 
which don’t seem to be getting persisted at the same level as the graph so 
those iterations take exponentially longer and I have to kill the job because 
it’s not worth waiting for it to finish.

The approach I’m currently trying is checkpointing the vertices and edges (and 
maybe the messages?) in Pregel. What I’ve been testing with so far is the below 
patch, which seems to be working (actually I haven’t had any failures since I 
added this change, so I don’t know if I did get one if it would recompute from 
the start or not) but I’m also seeing things like 5 instances of VertexRDDs 
being persisted all at the same time and “reduce at VertexRDD.scala:111” runs 
twice each time. I was wondering if this is the proper / most efficient way of 
doing this checkpointing, and if not what would work better?

diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
index 5e55620..5be40c3 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
@@ -134,6 +134,11 @@ object Pregel extends Logging {
       g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => 
newOpt.getOrElse(old) }
       g.cache()

+      g.vertices.checkpoint()
+      g.vertices.count()
+      g.edges.checkpoint()
+      g.edges.count()
+
       val oldMessages = messages
       // Send new messages. Vertices that didn't get any messages don't appear 
in newVerts, so don't
       // get to send messages. We must cache messages so it can be 
materialized on the next line,
@@ -142,6 +147,7 @@ object Pregel extends Logging {
       // The call to count() materializes `messages`, `newVerts`, and the 
vertices of `g`. This
       // hides oldMessages (depended on by newVerts), newVerts (depended on by 
messages), and the
       // vertices of prevG (depended on by newVerts, oldMessages, and the 
vertices of g).
+         messages.checkpoint()
       activeMessages = messages.count()

       logInfo("Pregel finished iteration " + i)

Best Regards,

Jeffrey Picard

Attachment: signature.asc
Description: Message signed with OpenPGP using GPGMail

Reply via email to