Hi all,
These days I have met a problem of GraphX’s strange behavior on
|collectNeighbors| API. It seems that this API has side-effects on the
Pregel API. It makes Pregel API not work as expected. The following is a
small code demo to reproduce this strange behavior. You can get the
whole source code in the attachment.
The steps to reproduce the side-effects:
1. Create a little toy graph with a simple vertex property type:
|class VertexProperty(val inNeighbor:ArrayBuffer[Long] =
ArrayBuffer[Long]()) extends Serializable { } // Create a data
graph. Vertices:1,2,3; Edges:2 -> 1, 3 -> 1, 2 -> 3. ...... val
purGraph = Graph(dataVertex, dataEdge).persist() |
2.
Call |collectNeighbors| method to get both inNeighbor graph and
outNeighbor graph of the |purGraph|. Then outer join the
|inNeighborGraph| with |purGraph| to get the |dataGraph|:
|// Get inNeighbor and outNeighbor graph from purGraph val
inNeighborGraph = purGraph.collectNeighbors(EdgeDirection.In) //
!!May be BUG here!! If we don't collect outneighbors here, the bug
will disappear. val outNeighborGraph =
purGraph.collectNeighbors(EdgeDirection.Out) // Now join the in
neighbor vertex id list to every vertex's property val dataGraph =
purGraph.outerJoinVertices(inNeighborGraph)((vid, property,
inNeighborList) => { val inNeighborVertexIds =
inNeighborList.getOrElse(Array[(VertexId, VertexProperty)]()).map(t
=> t._1) property.inNeighbor ++= inNeighborVertexIds.toBuffer
property }) |
3.
Conduct a simple Pregel computation on |dataGraph|. In the Pregel
vertex program phase, we do nothing but just print some debug info.
However, in the send message phase, we find that the |inNeighbor|
property of vertex 1 has changed! *The |inNeighbor| property values
of vertex 1 are inconsistent between the vertex program phase and
the send message phase!*
|val result = dataGraph.pregel(Array[Long](), maxIterations = 1,
EdgeDirection.Both)( // vertex program (id, property, msg) =>
vertexProgram(id, property, msg), // send messages triplet =>
sendMessage(triplet), // combine messages (a, b) =>
messageCombiner(a, b) ) // In the vertex program, we change
nothing... def vertexProgram(id: VertexId, property: VertexProperty,
msgSum: Array[Long]):VertexProperty = { if(id == 1L)
println("[Vertex Program]Vertex 1's inNeighbor property length is:"
+ property.inNeighbor.length) property } // In the send message
phase, we just check the vertex property of the same vertex. // We
should get the same results in the two phases. def
sendMessage(edge:EdgeTriplet[VertexProperty,
Null]):Iterator[(VertexId, Array[Long])]={ // Print vertex 1's
inNeighbor length if(edge.srcId == 1L) println("[Send Message]
Vertex 1's inNeighbor property length is:" +
edge.srcAttr.inNeighbor.length) if(edge.dstId == 1L) println("[Send
Message] Vertex 1's inNeighbor property length is:" +
edge.dstAttr.inNeighbor.length) val sIn = edge.srcAttr.inNeighbor
val dIn = edge.dstAttr.inNeighbor //send empty message
ArrayBuffer[(VertexId,Array[Long])]().toIterator } def
messageCombiner(a:Array[Long], b:Array[Long]):Array[Long]={
Array.concat(a,b) } |
In the program output, we get:
|[Vertex Program]Vertex 1's inNeighbor property length is:2 [Send
Message] Vertex 1's inNeighbor property length is:0 [Send Message]
Vertex 1's inNeighbor property length is:0 |
4.
More weirdly, if we comment out one of the |collectNeighbors| method
call, everything will be OK! As you may notice, actually we do not
use |outNeighborGraph| in our program, so we can comment the
following statement in the program:
|// val outNeighborGraph = purGraph.collectNeighbors(EdgeDirection.Out) |
If we comment that statement out, you can find that everything is
Okay now.
|[Vertex Program]Vertex 1's inNeighbor property length is:2 [Send
Message] Vertex 1's inNeighbor property length is:2 [Send Message]
Vertex 1's inNeighbor property length is:2 |
The behavior of |collectNeighbors| is strange. Maybe it is a bug of
GraphX or I call this API improperly or my vertex property is improper.
Could you please give some comments on this? Thanks a lot.
Regards,
Zhaokang Wang
PS. We have tested the code on Spark 1.5.1 and 1.6.0.
import org.apache.spark.{graphx, SparkConf, SparkContext}
import org.apache.spark.graphx.Edge
import org.apache.spark.graphx.EdgeDirection
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import scala.collection.mutable.ArrayBuffer
class VertexProperty(val inNeighbor:ArrayBuffer[Long] = ArrayBuffer[Long]())
extends Serializable { }
object Main {
def main(args: Array[String]): Unit = {
// begin Spark job
val conf = new SparkConf().setAppName("finding pregel API
bug").setMaster("local[2]")
val sc = new SparkContext(conf)
// Create a data graph. Vertices:1,2,3; Edges:2 -> 1, 3 -> 1, 2 -> 3.
// Every vertex have a property of VertexProperty Class.
val dataEdge =
sc.makeRDD(Array[Edge[Null]](Edge(2L,1L,null),Edge(3L,1L,null),Edge(2L,3L,null)))
val dataVertex:RDD[(VertexId, VertexProperty)] = sc.makeRDD(Array((1L, new
VertexProperty()),
(2L, new VertexProperty()), (3L, new VertexProperty())))
val purGraph = Graph(dataVertex, dataEdge).persist()
// !!!!!!!!!!!!!!!!!!!!!
// Get inNeighbor and outNeighborGraph from purGraph
val inNeighborGraph = purGraph.collectNeighbors(EdgeDirection.In)
// !!May be BUG here!! If we don't collect out neighbors here, the bug will
disappear.
val outNeighborGraph = purGraph.collectNeighbors(EdgeDirection.Out)
// Now join the in neighbor vertex id list to every vertex's property
val dataGraph = purGraph.outerJoinVertices(inNeighborGraph)((vid, property,
inNeighborList) => {
val inNeighborVertexIds = inNeighborList.getOrElse(Array[(VertexId,
VertexProperty)]()).map(t => t._1)
property.inNeighbor ++= inNeighborVertexIds.toBuffer
property
})
// !!!!!!!!
// In the Pregel operation, we will do noting. Doing nothing itself will
reveal the bug.
val result = dataGraph.pregel(Array[Long](), maxIterations = 1,
EdgeDirection.Both)(
// vertex program
(id, property, msg) => vertexProgram(id, property, msg),
// send messages
triplet => sendMessage(triplet),
// combine messages
(a, b) => messageCombiner(a, b)
)
//close spark
sc.stop()
}
// In the vertex program, we change nothing...
def vertexProgram(id: VertexId, property: VertexProperty, msgSum:
Array[Long]):VertexProperty = {
// Print the inNeighbor property of the vertex 1.
// As we can see, when we are in the vertex program phase, the vertex 1's
inNeighbor ArrayBuffer's length is 2.
// However, in the send message phase, this inNeighbor ArrayBuffer's length
will be changed to 0.
// The two lengths are inconsistent but we change nothing in the vertex
program.
if(id == 1L)
println("[Vertex Program]Vertex 1's inNeighbor property length is:" +
property.inNeighbor.length)
property
}
def sendMessage(edge:EdgeTriplet[VertexProperty, Null]):Iterator[(VertexId,
Array[Long])]={
// Print vertex 1's inNeighbor length
if(edge.srcId == 1L)
println("[Send Message] Vertex 1's inNeighbor property length is:" +
edge.srcAttr.inNeighbor.length)
if(edge.dstId == 1L)
println("[Send Message] Vertex 1's inNeighbor property length is:" +
edge.dstAttr.inNeighbor.length)
val sIn = edge.srcAttr.inNeighbor
val dIn = edge.dstAttr.inNeighbor
//send empty message
ArrayBuffer[(VertexId,Array[Long])]().toIterator
}
def messageCombiner(a:Array[Long], b:Array[Long]):Array[Long]={
Array.concat(a,b) }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org