I'm trying to setup a simple iterative message/update problem in GraphX (spark 1.2.0), but I'm running into issues with the caching and re-calculation of data. I'm trying to follow the example found in the Pregel implementation of materializing and cacheing messages and graphs and then unpersisting them after the next cycle has been done. It doesn't seem to be working, because every cycle gets progressively slower and it seems as if more and more of the values are being re-calculated despite my attempts to cache them.
The code: ``` var oldMessages : VertexRDD[List[Message]] = null var oldGraph : Graph[MyVertex, MyEdge ] = null curGraph = curGraph.mapVertices((x, y) => y.init()) for (i <- 0 to cycle_count) { val curMessages = curGraph.aggregateMessages[List[Message]](x => { //send messages ..... }, (x, y) => { //collect messages into lists val out = x ++ y out } ).cache() curMessages.count() val ti = i oldGraph = curGraph curGraph = curGraph.outerJoinVertices(curMessages)( (vid, vertex, message) => vertex.process(message.getOrElse(List[Message]()), ti) ).cache() curGraph.vertices.count() oldGraph.unpersistVertices(blocking = false) oldGraph.edges.unpersist(blocking = false) oldGraph = curGraph if (oldMessages != null ) { oldMessages.unpersist(blocking=false) } oldMessages = curMessages } ``` The MyVertex.process method takes the list of incoming messages, averages them and returns a new MyVertex object. I've also set it up to append the cycle number (the second argument) into a log file named after the vertex. What ends up getting dumped into the log file for every vertex (in the exact same pattern) is ``` Cycle: 0 Cycle: 1 Cycle: 0 Cycle: 2 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 3 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 2 Cycle: 4 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 2 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 2 Cycle: 3 Cycle: 5 ``` Any ideas about what I might be doing wrong for the caching? And how I can avoid re-calculating so many of the values. Kyle