hi,everyone, I encountered a strange problem these days when i'm attempting
to use the GraphX Pregel interface to implement a simple
single-source-shortest-path algorithm.
below is my code:

import com.alibaba.fastjson.JSONObject
import org.apache.spark.graphx._

import org.apache.spark.{SparkConf, SparkContext}

object PregelTest {

  def run(graph: Graph[JSONObject, JSONObject]): Graph[JSONObject,
JSONObject] = {

    def vProg(v: VertexId, attr: JSONObject, msg: Integer): JSONObject = {
      if ( msg < 0 ) {
        // init message received
        if ( v.equals(0.asInstanceOf[VertexId]) ) attr.put("LENGTH", 0)
        else attr.put("LENGTH", Integer.MAX_VALUE)
      } else {
        attr.put("LENGTH", msg+1)
      }
      attr
    }

    def sendMsg(triplet: EdgeTriplet[JSONObject, JSONObject]):
Iterator[(VertexId, Integer)] = {
      val len = triplet.srcAttr.getInteger("LENGTH")
      // send a msg if last hub is reachable
      if ( len<Integer.MAX_VALUE ) Iterator((triplet.dstId, len))
      else Iterator.empty
    }

    def mergeMsg(msg1: Integer, msg2: Integer): Integer = {
      if ( msg1 &lt; msg2 ) msg1 else msg2
    }

    Pregel(graph, new Integer(-1), 3, EdgeDirection.Out)(vProg, sendMsg,
mergeMsg)
  }

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(&quot;Pregel Test&quot;)
    conf.set(&quot;spark.master&quot;, &quot;local&quot;)
    val sc = new SparkContext(conf)

    // create a simplest test graph with 3 nodes and 2 edges
    val vertexList = Array(
      (0.asInstanceOf[VertexId], new JSONObject()),
      (1.asInstanceOf[VertexId], new JSONObject()),
      (2.asInstanceOf[VertexId], new JSONObject()))
    val edgeList = Array(
      Edge(0.asInstanceOf[VertexId], 1.asInstanceOf[VertexId], new
JSONObject()),
      Edge(1.asInstanceOf[VertexId], 2.asInstanceOf[VertexId], new
JSONObject()))

    val vertexRdd = sc.parallelize(vertexList)
    val edgeRdd = sc.parallelize(edgeList)
    val g = Graph[JSONObject, JSONObject](vertexRdd, edgeRdd)

    // run test code
    val lpa = run(g)
    lpa
  }
}

and after i run the code, I got a incorrect result in which the vertex 2 has
a &quot;LENGTH&quot; label valued &lt;Integer.MAX_VALUE>, it seems that the
messages sent to vertex 2 was lost unexpectedly. I then tracked the debugger
to file Pregel.scala,  where I saw the code:

<http://apache-spark-user-list.1001560.n3.nabble.com/file/n28100/%E7%B2%98%E8%B4%B4%E5%9B%BE%E7%89%87.png>
 
 
In the first iteration 0, the variable messages in line 138 is reconstructed
, and then recomputed in line 143, in where activeMessages got a value 0,
which means the messages is lost.
then I set a breakpoint in line 138, and before its execution I execute an
expression " g.triplets().collect() " which just collects the updated graph
data. after I done this and execute the rest code, the messages is no longer
empty and activeMessages got value 1 as expected.  

I have tested the code with both spark&&graphx 1.4 and 1.6 in scala 2.10,
and got the same result.

I must say this problem makes me really confused, I've spent almost 2 weeks
to resolve it and I have no idea how to do it now. If this is not a bug, I
totally can't understand why just executing a non-disturb expression (
g.triplets().collect(), it just collect the data and do noting computing )
could changing the essential, it's really ridiculous.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Pregel-not-update-vertex-state-properly-cause-messages-loss-tp28100.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to