Dear Spark developers,
Below is the GraphX Pregel code snippet from
https://spark.apache.org/docs/latest/graphx-programming-guide.html#pregel-api:
(it does not contain caching step):
while (activeMessages > 0 && i < maxIterations) {
// Receive the messages:
-----------------------------------------------------------------------
// (1st join) Run the vertex program on all vertices that receive messages
val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
// (2nd join) Merge the new vertex values back into the graph
g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) =>
newOpt.getOrElse(old) }.cache()
// Send Messages:
------------------------------------------------------------------------------
// Vertices that didn't receive a message above don't appear in newVerts
and therefore don't
// get to send messages. More precisely the map phase of
mapReduceTriplets is only invoked
// on edges in the activeDir of vertices in newVerts
messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts,
activeDir))).cache()
activeMessages = messages.count()
i += 1
}
It seems that the mentioned two joins can be rewritten as one outer join as
follows:
g = g.outerJoinVertices(messages) { (vid, old, mess) => mess match {
case Some(mess) => vprog(vid, old, mess)
case None => old }
}
This code passes PregelSuite (after removing newVerts). Could you elaborate why
two joins are used instead of one and why do you need intermediate variable
`newVerts`? Are there some performance considerations?
Best regards, Alexander