hi,everyone, I encountered a strange problem these days when i'm attempting to use the GraphX Pregel interface to implement a simple single-source-shortest-path algorithm. below is my code:
import com.alibaba.fastjson.JSONObject import org.apache.spark.graphx._ import org.apache.spark.{SparkConf, SparkContext} object PregelTest { def run(graph: Graph[JSONObject, JSONObject]): Graph[JSONObject, JSONObject] = { def vProg(v: VertexId, attr: JSONObject, msg: Integer): JSONObject = { if ( msg < 0 ) { // init message received if ( v.equals(0.asInstanceOf[VertexId]) ) attr.put("LENGTH", 0) else attr.put("LENGTH", Integer.MAX_VALUE) } else { attr.put("LENGTH", msg+1) } attr } def sendMsg(triplet: EdgeTriplet[JSONObject, JSONObject]): Iterator[(VertexId, Integer)] = { val len = triplet.srcAttr.getInteger("LENGTH") // send a msg if last hub is reachable if ( len<Integer.MAX_VALUE ) Iterator((triplet.dstId, len)) else Iterator.empty } def mergeMsg(msg1: Integer, msg2: Integer): Integer = { if ( msg1 < msg2 ) msg1 else msg2 } Pregel(graph, new Integer(-1), 3, EdgeDirection.Out)(vProg, sendMsg, mergeMsg) } def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Pregel Test") conf.set("spark.master", "local") val sc = new SparkContext(conf) // create a simplest test graph with 3 nodes and 2 edges val vertexList = Array( (0.asInstanceOf[VertexId], new JSONObject()), (1.asInstanceOf[VertexId], new JSONObject()), (2.asInstanceOf[VertexId], new JSONObject())) val edgeList = Array( Edge(0.asInstanceOf[VertexId], 1.asInstanceOf[VertexId], new JSONObject()), Edge(1.asInstanceOf[VertexId], 2.asInstanceOf[VertexId], new JSONObject())) val vertexRdd = sc.parallelize(vertexList) val edgeRdd = sc.parallelize(edgeList) val g = Graph[JSONObject, JSONObject](vertexRdd, edgeRdd) // run test code val lpa = run(g) lpa } } and after i run the code, I got a incorrect result in which the vertex 2 has a "LENGTH" label valued <Integer.MAX_VALUE>, it seems that the messages sent to vertex 2 was lost unexpectedly. I then tracked the debugger to file Pregel.scala, where I saw the code: <http://apache-spark-user-list.1001560.n3.nabble.com/file/n28100/%E7%B2%98%E8%B4%B4%E5%9B%BE%E7%89%87.png> In the first iteration 0, the variable messages in line 138 is reconstructed , and then recomputed in line 143, in where activeMessages got a value 0, which means the messages is lost. then I set a breakpoint in line 138, and before its execution I execute an expression " g.triplets().collect() " which just collects the updated graph data. after I done this and execute the rest code, the messages is no longer empty and activeMessages got value 1 as expected. I have tested the code with both spark&&graphx 1.4 and 1.6 in scala 2.10, and got the same result. I must say this problem makes me really confused, I've spent almost 2 weeks to resolve it and I have no idea how to do it now. If this is not a bug, I totally can't understand why just executing a non-disturb expression ( g.triplets().collect(), it just collect the data and do noting computing ) could changing the essential, it's really ridiculous. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Pregel-not-update-vertex-state-properly-cause-messages-loss-tp28100.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org