[ https://issues.apache.org/jira/browse/SPARK-34517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
MiKou updated SPARK-34517: -------------------------- Description: Using 'Graph.aggregateMessages' and 'Graph.outerJoinVertices' together when implementing an iterative graph-algorithm which is very similar to 'Graph.staticParallelPersonalizedPageRank'. There will be a bug if 'outerJoinVertices' does not change the data-type of vertices. More specifically, 'aggregateMessages' gives the same results for some iteration, while the value of vertices was still changed and the attribute of a message is fully depend on the value of its source vertex. I temporarily solve the problem by changing the data-type when using 'outerJoinVertices' and then change it back with 'mapVertices'. It showed that the problem is probably due to the incorrect updating of activeSet when 'outerJoinVertices' keeps the data-type of vertices. Example code: {code:java} val prevRankGraph = rankGraph val rankUpdates = prevRankGraph.aggregateMessages[Vector[Double]]( ctx => { println(ctx.srcId.nodeId, " pushing value: " + ctx.srcAttr._2) ctx.sendToDst(ctx.srcAttr._2 *:* ctx.attr) }, (a : Vector[Double], b : Vector[Double]) => a +:+ b, TripletFields.Src ) rankGraph = prevRankGraph.outerJoinVertices(rankUpdates) { case (vid, (deg, rsd), msgSumOpt) => println(vid.nodeId, " value: " + rsd.toString()) (deg, msgSumOpt.getOrElse(zero) *:* (1.0 - alpha)) }.cache() {code} Example graph: {code:java} 0 6 0 6 1 0 1 5 1 6 2 0 2 7 3 2 3 6 4 7 5 1 6 8 7 5 7 9 8 0 9 6 {code} was: Using 'Graph.aggregateMessages' and 'Graph.outerJoinVertices' together when implementing an iterative graph-algorithm which is very similar to 'Graph.staticParallelPersonalizedPageRank'. There will be a bug if 'outerJoinVertices' does not change the data-type of vertices. More specifically, 'aggregateMessages' gives the same results for some iteration, while the value of vertices was still changed and the attribute of a message is fully depend on the value of its source vertex. I temporarily solve the problem by changing the data-type when using 'outerJoinVertices' and then change it back with 'mapVertices'. It showed that the problem is probably due to the incorrect updating of activeSet when 'outerJoinVertices' keeps the data-type of vertices. > Unexpected result when using 'aggregateMessages' with 'outerJoinVertices' > ------------------------------------------------------------------------- > > Key: SPARK-34517 > URL: https://issues.apache.org/jira/browse/SPARK-34517 > Project: Spark > Issue Type: Bug > Components: GraphX > Affects Versions: 3.0.2 > Environment: OpenJDK 1.8.0_275 > Scala 2.12.10 > Hadoop 3.3.0 > Spark 3.0.2 > Reporter: MiKou > Priority: Critical > Attachments: bug-1.png, bug-2.png > > > Using 'Graph.aggregateMessages' and 'Graph.outerJoinVertices' together when > implementing an iterative graph-algorithm which is very similar to > 'Graph.staticParallelPersonalizedPageRank'. > > There will be a bug if 'outerJoinVertices' does not change the data-type of > vertices. More specifically, 'aggregateMessages' gives the same results for > some iteration, while the value of vertices was still changed and the > attribute of a message is fully depend on the value of its source vertex. > > I temporarily solve the problem by changing the data-type when using > 'outerJoinVertices' and then change it back with 'mapVertices'. It showed > that the problem is probably due to the incorrect updating of activeSet when > 'outerJoinVertices' keeps the data-type of vertices. > > Example code: > > {code:java} > val prevRankGraph = rankGraph > val rankUpdates = prevRankGraph.aggregateMessages[Vector[Double]]( > ctx => { > println(ctx.srcId.nodeId, " pushing value: " + ctx.srcAttr._2) > ctx.sendToDst(ctx.srcAttr._2 *:* ctx.attr) > }, > (a : Vector[Double], b : Vector[Double]) => a +:+ b, > TripletFields.Src > ) > rankGraph = prevRankGraph.outerJoinVertices(rankUpdates) { case (vid, (deg, > rsd), msgSumOpt) => > println(vid.nodeId, " value: " + rsd.toString()) > (deg, msgSumOpt.getOrElse(zero) *:* (1.0 - alpha)) > }.cache() > {code} > Example graph: > {code:java} > 0 6 > 0 6 > 1 0 > 1 5 > 1 6 > 2 0 > 2 7 > 3 2 > 3 6 > 4 7 > 5 1 > 6 8 > 7 5 > 7 9 > 8 0 > 9 6 > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org