[ 
https://issues.apache.org/jira/browse/SPARK-10335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15250143#comment-15250143
 ] 

Aleksandr commented on SPARK-10335:
-----------------------------------

I have a very similar problem. Pregel time of execution depends non linearly on 
the number of iterations. Though I do not run out of memory, I need to recreate 
graph in a similar fashion as the author of the issue. When the graph is 
recreated with the same data the algorithm starts working fast again. I suppose 
it is connected somehow with internals of spark/DAG calculation, but do not 
know for sure. My spark version is 1.6.1

> GraphX Connected Components fail with large number of iterations
> ----------------------------------------------------------------
>
>                 Key: SPARK-10335
>                 URL: https://issues.apache.org/jira/browse/SPARK-10335
>             Project: Spark
>          Issue Type: Improvement
>          Components: GraphX
>    Affects Versions: 1.4.0
>         Environment: Tested with Yarn client mode
>            Reporter: JJ Zhang
>
> For graphs with long chains of connected vertices, the algorithm fails in 
> practice to converge.
> The driver always runs out of memory prior to final convergence. In my test, 
> for 1.2B vertices/1.2B edges with the longest path requiring more than 90 
> iterations, it invariably fails around iteration 80 with driver memory set at 
> 50G. On top of that, each iteration takes longer than previous, and it took 
> an overnight run on 50 node cluster to the failure point.
> It is presumably due to keeping track of the RDD lineage and the DAG 
> computation. So truncate RDD lineage is the most straight-forward solution.
> Tried using checkpoint, but apparently it is not working as expected in 
> version 1.4.0. Will file another ticket for that issue, but hopefully it has 
> been resolved by 1.5.
> Proposed solution below. This was tested and able to converge after 99 
> iterations on the same graph mentioned above, in less than an hour.
> {code: title=RobustConnectedComponents.scala|borderStyle=solid}
> import org.apache.spark.Logging
> import scala.reflect.ClassTag
> import org.apache.spark.graphx._
> object RobustConnectedComponents extends Logging with java.io.Serializable {
>   def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], interval: Int = 
> 50, dir : String): (Graph[VertexId, String], Int) = {
>     val ccGraph = graph.mapVertices { case (vid, _) => vid }.mapEdges { x 
> =>""}
>     def sendMessage(edge: EdgeTriplet[VertexId, String]): Iterator[(VertexId, 
> Long)] = {
>       if (edge.srcAttr < edge.dstAttr) {
>         Iterator((edge.dstId, edge.srcAttr))
>       } else if (edge.srcAttr > edge.dstAttr) {
>         Iterator((edge.srcId, edge.dstAttr))
>       } else {
>         Iterator.empty
>       }
>     }
>     val initialMessage = Long.MaxValue
>    
>     
>     var g: Graph[VertexId, String] = ccGraph
>     var i = interval
>     var count = 0
>     while (i == interval) {
>       g = refreshGraph(g, dir, count)
>       g.cache()
>       val (g1, i1) = pregel(g, initialMessage, interval, activeDirection = 
> EdgeDirection.Either)(
>         vprog = (id, attr, msg) => math.min(attr, msg),
>         sendMsg = sendMessage,
>         mergeMsg = (a, b) => math.min(a, b))
>       g.unpersist()
>       g = g1
>       i = i1
>       count = count + i
>       logInfo("Checkpoint reached. iteration so far: " + count)
>     }
>     logInfo("Final Converge: Total Iteration:" + count)
>     (g, count)
>   } // end of connectedComponents
>   
>   def refreshGraph(g : Graph[VertexId, String], dir:String, count:Int): 
> Graph[VertexId, String] = {
>     val vertFile = dir + "/iter-" + count + "/vertices"
>     val edgeFile = dir + "/iter-" + count + "/edges"
>     g.vertices.saveAsObjectFile(vertFile)
>     g.edges.saveAsObjectFile(edgeFile)
>     
>     //load back
>     val v : RDD[(VertexId, VertexId)] = 
> g.vertices.sparkContext.objectFile(vertFile)
>     val e : RDD[Edge[String]]= g.vertices.sparkContext.objectFile(edgeFile)
>     
>     val newGraph = Graph(v, e)
>     newGraph
>   }
>   def pregel[VD: ClassTag, ED: ClassTag, A: ClassTag](graph: Graph[VD, ED],
>     initialMsg: A,
>     maxIterations: Int = Int.MaxValue,
>     activeDirection: EdgeDirection = EdgeDirection.Either)(vprog: (VertexId, 
> VD, A) => VD,
>       sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
>       mergeMsg: (A, A) => A): (Graph[VD, ED], Int) =
>     {
>       var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata,  
> initialMsg)).cache()
>       // compute the messages
>       var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
>       var activeMessages = messages.count()
>       // Loop
>       var prevG: Graph[VD, ED] = null
>       var i = 0
>       while (activeMessages > 0 && i < maxIterations) {
>         // Receive the messages and update the vertices.
>         prevG = g
>         g = g.joinVertices(messages)(vprog).cache()
>         val oldMessages = messages
>         // Send new messages, skipping edges where neither side received a 
> message. We must cache
>         // messages so it can be materialized on the next line, allowing us 
> to uncache the previous
>         // iteration.
>         messages = g.mapReduceTriplets(
>           sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
>         // The call to count() materializes `messages` and the vertices of 
> `g`. This hides oldMessages
>         // (depended on by the vertices of g) and the vertices of prevG 
> (depended on by oldMessages
>         // and the vertices of g).
>         activeMessages = messages.count()
>         // Unpersist the RDDs hidden by newly-materialized RDDs
>         oldMessages.unpersist(blocking = false)
>         prevG.unpersistVertices(blocking = false)
>         prevG.edges.unpersist(blocking = false)
>         // count the iteration
>         i += 1
>       }
>       (g, i)
>     } // end of apply
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to