Hi all,

These days I have met a problem of GraphX’s strange behavior on |collectNeighbors| API. It seems that this API has side-effects on the Pregel API. It makes Pregel API not work as expected. The following is a small code demo to reproduce this strange behavior. You can get the whole source code in the attachment.

The steps to reproduce the side-effects:

1. Create a little toy graph with a simple vertex property type:

   |class VertexProperty(val inNeighbor:ArrayBuffer[Long] =
   ArrayBuffer[Long]()) extends Serializable { } // Create a data
   graph. Vertices:1,2,3; Edges:2 -> 1, 3 -> 1, 2 -> 3. ...... val
   purGraph = Graph(dataVertex, dataEdge).persist() |

2.

   Call |collectNeighbors| method to get both inNeighbor graph and
   outNeighbor graph of the |purGraph|. Then outer join the
   |inNeighborGraph| with |purGraph| to get the |dataGraph|:

   |// Get inNeighbor and outNeighbor graph from purGraph val
   inNeighborGraph = purGraph.collectNeighbors(EdgeDirection.In) //
   !!May be BUG here!! If we don't collect outneighbors here, the bug
   will disappear. val outNeighborGraph =
   purGraph.collectNeighbors(EdgeDirection.Out) // Now join the in
   neighbor vertex id list to every vertex's property val dataGraph =
   purGraph.outerJoinVertices(inNeighborGraph)((vid, property,
   inNeighborList) => { val inNeighborVertexIds =
   inNeighborList.getOrElse(Array[(VertexId, VertexProperty)]()).map(t
   => t._1) property.inNeighbor ++= inNeighborVertexIds.toBuffer
   property }) |

3.

   Conduct a simple Pregel computation on |dataGraph|. In the Pregel
   vertex program phase, we do nothing but just print some debug info.
   However, in the send message phase, we find that the |inNeighbor|
   property of vertex 1 has changed! *The |inNeighbor| property values
   of vertex 1 are inconsistent between the vertex program phase and
   the send message phase!*

   |val result = dataGraph.pregel(Array[Long](), maxIterations = 1,
   EdgeDirection.Both)( // vertex program (id, property, msg) =>
   vertexProgram(id, property, msg), // send messages triplet =>
   sendMessage(triplet), // combine messages (a, b) =>
   messageCombiner(a, b) ) // In the vertex program, we change
   nothing... def vertexProgram(id: VertexId, property: VertexProperty,
   msgSum: Array[Long]):VertexProperty = { if(id == 1L)
   println("[Vertex Program]Vertex 1's inNeighbor property length is:"
   + property.inNeighbor.length) property } // In the send message
   phase, we just check the vertex property of the same vertex. // We
   should get the same results in the two phases. def
   sendMessage(edge:EdgeTriplet[VertexProperty,
   Null]):Iterator[(VertexId, Array[Long])]={ // Print vertex 1's
   inNeighbor length if(edge.srcId == 1L) println("[Send Message]
   Vertex 1's inNeighbor property length is:" +
   edge.srcAttr.inNeighbor.length) if(edge.dstId == 1L) println("[Send
   Message] Vertex 1's inNeighbor property length is:" +
   edge.dstAttr.inNeighbor.length) val sIn = edge.srcAttr.inNeighbor
   val dIn = edge.dstAttr.inNeighbor //send empty message
   ArrayBuffer[(VertexId,Array[Long])]().toIterator } def
   messageCombiner(a:Array[Long], b:Array[Long]):Array[Long]={
   Array.concat(a,b) } |

   In the program output, we get:

   |[Vertex Program]Vertex 1's inNeighbor property length is:2 [Send
   Message] Vertex 1's inNeighbor property length is:0 [Send Message]
   Vertex 1's inNeighbor property length is:0 |

4.

   More weirdly, if we comment out one of the |collectNeighbors| method
   call, everything will be OK! As you may notice, actually we do not
   use |outNeighborGraph| in our program, so we can comment the
   following statement in the program:

   |// val outNeighborGraph = purGraph.collectNeighbors(EdgeDirection.Out) |

   If we comment that statement out, you can find that everything is
   Okay now.

   |[Vertex Program]Vertex 1's inNeighbor property length is:2 [Send
   Message] Vertex 1's inNeighbor property length is:2 [Send Message]
   Vertex 1's inNeighbor property length is:2 |

The behavior of |collectNeighbors| is strange. Maybe it is a bug of GraphX or I call this API improperly or my vertex property is improper. Could you please give some comments on this? Thanks a lot.

Regards,

Zhaokang Wang

PS. We have tested the code on Spark 1.5.1 and 1.6.0.

​
import org.apache.spark.{graphx, SparkConf, SparkContext}
import org.apache.spark.graphx.Edge
import org.apache.spark.graphx.EdgeDirection
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

import scala.collection.mutable.ArrayBuffer


class VertexProperty(val inNeighbor:ArrayBuffer[Long] = ArrayBuffer[Long]()) 
extends Serializable { }

object Main {

  def main(args: Array[String]): Unit = {
    // begin Spark job
    val conf = new SparkConf().setAppName("finding pregel API 
bug").setMaster("local[2]")
    val sc = new SparkContext(conf)


    // Create a data graph. Vertices:1,2,3; Edges:2 -> 1, 3 -> 1, 2 -> 3.
    // Every vertex have a property of VertexProperty Class.
    val dataEdge = 
sc.makeRDD(Array[Edge[Null]](Edge(2L,1L,null),Edge(3L,1L,null),Edge(2L,3L,null)))
    val dataVertex:RDD[(VertexId, VertexProperty)] = sc.makeRDD(Array((1L, new 
VertexProperty()),
        (2L, new VertexProperty()), (3L, new VertexProperty())))
    val purGraph = Graph(dataVertex, dataEdge).persist()
    // !!!!!!!!!!!!!!!!!!!!!
    // Get inNeighbor and outNeighborGraph from purGraph
    val inNeighborGraph = purGraph.collectNeighbors(EdgeDirection.In)
    // !!May be BUG here!! If we don't collect out neighbors here, the bug will 
disappear.
    val outNeighborGraph = purGraph.collectNeighbors(EdgeDirection.Out)
    // Now join the in neighbor vertex id list to every vertex's property
    val dataGraph = purGraph.outerJoinVertices(inNeighborGraph)((vid, property, 
inNeighborList) => {
      val inNeighborVertexIds = inNeighborList.getOrElse(Array[(VertexId, 
VertexProperty)]()).map(t => t._1)
      property.inNeighbor ++= inNeighborVertexIds.toBuffer
      property
    })
    // !!!!!!!!


    // In the Pregel operation, we will do noting. Doing nothing itself will 
reveal the bug.
    val result = dataGraph.pregel(Array[Long](), maxIterations = 1, 
EdgeDirection.Both)(
      // vertex program
      (id, property, msg) => vertexProgram(id, property, msg),
      // send messages
      triplet => sendMessage(triplet),
      // combine messages
      (a, b) => messageCombiner(a, b)
    )
    //close spark
    sc.stop()
  }

  // In the vertex program, we change nothing...
  def vertexProgram(id: VertexId, property: VertexProperty, msgSum: 
Array[Long]):VertexProperty = {
    // Print the inNeighbor property of the vertex 1.
    // As we can see, when we are in the vertex program phase, the vertex 1's 
inNeighbor ArrayBuffer's length is 2.
    // However, in the send message phase, this inNeighbor ArrayBuffer's length 
will be changed to 0.
    // The two lengths are inconsistent but we change nothing in the vertex 
program.

    if(id == 1L)
      println("[Vertex Program]Vertex 1's inNeighbor property length is:" + 
property.inNeighbor.length)

    property
  }

  def sendMessage(edge:EdgeTriplet[VertexProperty, Null]):Iterator[(VertexId, 
Array[Long])]={
    // Print vertex 1's inNeighbor length
    if(edge.srcId == 1L)
      println("[Send Message] Vertex 1's inNeighbor property length is:" + 
edge.srcAttr.inNeighbor.length)
    if(edge.dstId == 1L)
      println("[Send Message] Vertex 1's inNeighbor property length is:" + 
edge.dstAttr.inNeighbor.length)

    val sIn = edge.srcAttr.inNeighbor
    val dIn = edge.dstAttr.inNeighbor

    //send empty message
    ArrayBuffer[(VertexId,Array[Long])]().toIterator
  }

  def messageCombiner(a:Array[Long], b:Array[Long]):Array[Long]={ 
Array.concat(a,b) }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to