The problem comes from the inconsistency between graph’s triplet view and
vertex view. The message may not be lost but the message is just not sent
in sendMsgfunction because sendMsg function gets wrong value of srcAttr!

It is not a new bug. I met a similar bug that appeared in version 1.2.1
according to JIAR-6378 <https://issues.apache.org/jira/browse/SPARK-6378>
before. I can reproduce that inconsistency bug with a small and simple
program (See that JIRA issue for more details). It seems that in some
situation the triplet view of a Graph object does not update consistently
with vertex view. The GraphX Pregel API heavily relies on
mapReduceTriplets(old)/aggregateMessages(new) API who heavily relies on the
correct behavior of the triplet view of a graph. Thus this bug influences
on behavior of Pregel API.

Though I cannot figure out why the bug appears either, but I suspect that
the bug has some connection with the data type of the vertex property. If
you use *primitive* types such as Double and Long, it is OK. But if you use
some self-defined type with mutable fields such as mutable Map and mutable
ArrayBuffer, the bug appears. In your case I notice that you use JSONObject
as your vertex’s data type. After looking up the definition ofJSONObject,
JSONObject has a java map as its field to store data which is mutable. To
temporarily avoid the bug, you can modify the data type of your vertex
property to avoid any mutable data type by replacing mutable data
collection to immutable data collection provided by Scala and replacing var
field to val field. At least, that suggestion works for me.

Zhaokang Wang
​

2016-11-18 11:47 GMT+08:00 fuz_woo <fuz....@qq.com>:

> hi,everyone, I encountered a strange problem these days when i'm attempting
> to use the GraphX Pregel interface to implement a simple
> single-source-shortest-path algorithm.
> below is my code:
>
> import com.alibaba.fastjson.JSONObject
> import org.apache.spark.graphx._
>
> import org.apache.spark.{SparkConf, SparkContext}
>
> object PregelTest {
>
>   def run(graph: Graph[JSONObject, JSONObject]): Graph[JSONObject,
> JSONObject] = {
>
>     def vProg(v: VertexId, attr: JSONObject, msg: Integer): JSONObject = {
>       if ( msg < 0 ) {
>         // init message received
>         if ( v.equals(0.asInstanceOf[VertexId]) ) attr.put("LENGTH", 0)
>         else attr.put("LENGTH", Integer.MAX_VALUE)
>       } else {
>         attr.put("LENGTH", msg+1)
>       }
>       attr
>     }
>
>     def sendMsg(triplet: EdgeTriplet[JSONObject, JSONObject]):
> Iterator[(VertexId, Integer)] = {
>       val len = triplet.srcAttr.getInteger("LENGTH")
>       // send a msg if last hub is reachable
>       if ( len<Integer.MAX_VALUE ) Iterator((triplet.dstId, len))
>       else Iterator.empty
>     }
>
>     def mergeMsg(msg1: Integer, msg2: Integer): Integer = {
>       if ( msg1 &lt; msg2 ) msg1 else msg2
>     }
>
>     Pregel(graph, new Integer(-1), 3, EdgeDirection.Out)(vProg, sendMsg,
> mergeMsg)
>   }
>
>   def main(args: Array[String]): Unit = {
>     val conf = new SparkConf().setAppName(&quot;Pregel Test&quot;)
>     conf.set(&quot;spark.master&quot;, &quot;local&quot;)
>     val sc = new SparkContext(conf)
>
>     // create a simplest test graph with 3 nodes and 2 edges
>     val vertexList = Array(
>       (0.asInstanceOf[VertexId], new JSONObject()),
>       (1.asInstanceOf[VertexId], new JSONObject()),
>       (2.asInstanceOf[VertexId], new JSONObject()))
>     val edgeList = Array(
>       Edge(0.asInstanceOf[VertexId], 1.asInstanceOf[VertexId], new
> JSONObject()),
>       Edge(1.asInstanceOf[VertexId], 2.asInstanceOf[VertexId], new
> JSONObject()))
>
>     val vertexRdd = sc.parallelize(vertexList)
>     val edgeRdd = sc.parallelize(edgeList)
>     val g = Graph[JSONObject, JSONObject](vertexRdd, edgeRdd)
>
>     // run test code
>     val lpa = run(g)
>     lpa
>   }
> }
>
> and after i run the code, I got a incorrect result in which the vertex 2
> has
> a &quot;LENGTH&quot; label valued &lt;Integer.MAX_VALUE>, it seems that the
> messages sent to vertex 2 was lost unexpectedly. I then tracked the
> debugger
> to file Pregel.scala,  where I saw the code:
>
> <http://apache-spark-user-list.1001560.n3.nabble.com/
> file/n28100/%E7%B2%98%E8%B4%B4%E5%9B%BE%E7%89%87.png>
>
> In the first iteration 0, the variable messages in line 138 is
> reconstructed
> , and then recomputed in line 143, in where activeMessages got a value 0,
> which means the messages is lost.
> then I set a breakpoint in line 138, and before its execution I execute an
> expression " g.triplets().collect() " which just collects the updated graph
> data. after I done this and execute the rest code, the messages is no
> longer
> empty and activeMessages got value 1 as expected.
>
> I have tested the code with both spark&&graphx 1.4 and 1.6 in scala 2.10,
> and got the same result.
>
> I must say this problem makes me really confused, I've spent almost 2 weeks
> to resolve it and I have no idea how to do it now. If this is not a bug, I
> totally can't understand why just executing a non-disturb expression (
> g.triplets().collect(), it just collect the data and do noting computing )
> could changing the essential, it's really ridiculous.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/GraphX-Pregel-not-update-vertex-
> state-properly-cause-messages-loss-tp28100.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

Reply via email to