[ https://issues.apache.org/jira/browse/SPARK-34517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
MiKou updated SPARK-34517: -------------------------- Attachment: (was: bug-1.png) > Unexpected result when using 'aggregateMessages' with 'outerJoinVertices' > ------------------------------------------------------------------------- > > Key: SPARK-34517 > URL: https://issues.apache.org/jira/browse/SPARK-34517 > Project: Spark > Issue Type: Bug > Components: GraphX > Affects Versions: 3.0.2 > Environment: OpenJDK 1.8.0_275 > Scala 2.12.10 > Hadoop 3.3.0 > Spark 3.0.2 > Reporter: MiKou > Priority: Critical > > Using 'Graph.aggregateMessages' and 'Graph.outerJoinVertices' together when > implementing an iterative graph-algorithm which is very similar to > 'Graph.staticParallelPersonalizedPageRank'. > > There will be a bug if 'outerJoinVertices' does not change the data-type of > vertices. More specifically, 'aggregateMessages' gives the same results for > some iteration, while the value of vertices was still changed and the > attribute of a message is fully depend on the value of its source vertex. > > I temporarily solve the problem by changing the data-type when using > 'outerJoinVertices' and then change it back with 'mapVertices'. It showed > that the problem is probably due to the incorrect updating of activeSet when > 'outerJoinVertices' keeps the data-type of vertices. > > Example code: > > {code:java} > val prevRankGraph = rankGraph > val rankUpdates = prevRankGraph.aggregateMessages[Vector[Double]]( > ctx => { > println(ctx.srcId.nodeId, " pushing value: " + ctx.srcAttr._2) > ctx.sendToDst(ctx.srcAttr._2 *:* ctx.attr) > }, > (a : Vector[Double], b : Vector[Double]) => a +:+ b, > TripletFields.Src > ) > rankGraph = prevRankGraph.outerJoinVertices(rankUpdates) { case (vid, (deg, > rsd), msgSumOpt) => > println(vid.nodeId, " value: " + rsd.toString()) > (deg, msgSumOpt.getOrElse(zero) *:* (1.0 - alpha)) > }.cache() > {code} > Example graph: > {code:java} > 0 6 > 0 6 > 1 0 > 1 5 > 1 6 > 2 0 > 2 7 > 3 2 > 3 6 > 4 7 > 5 1 > 6 8 > 7 5 > 7 9 > 8 0 > 9 6 > {code} > > Executor1's log: > {code:java} > (1, pushing value: SparseVector(10)()) > (1, pushing value: SparseVector(10)()) > (1, pushing value: SparseVector(10)()) > (5, pushing value: SparseVector(10)()) > (1, value: SparseVector(10)()) > (2, value: SparseVector(10)()) > (3, value: SparseVector(10)((3,1.0))) > (8, value: SparseVector(10)()) > (1, pushing value: SparseVector(10)()) > (1, pushing value: SparseVector(10)()) > (1, pushing value: SparseVector(10)()) > (5, pushing value: SparseVector(10)()) > (1, value: SparseVector(10)()) > (2, value: SparseVector(10)((3,0.4))) > #On the vertex view, node-3 has still no value in the second round.# > (3, value: SparseVector(10)()) > (8, value: SparseVector(10)()) > {code} > Executor2's log: > {code:java} > (3, pushing value: SparseVector(10)((3,1.0))) > (3, pushing value: SparseVector(10)((3,1.0))) > (7, pushing value: SparseVector(10)()) > (7, pushing value: SparseVector(10)()) > (9, pushing value: SparseVector(10)()) > (9, value: SparseVector(10)()) > (0, value: SparseVector(10)()) > #On the edge view, the value of node-3 does not change in the second round.# > (3, pushing value: SparseVector(10)((3,1.0))) > (3, pushing value: SparseVector(10)((3,1.0))) > (7, pushing value: SparseVector(10)()) > (7, pushing value: SparseVector(10)()) > (9, pushing value: SparseVector(10)()) > (9, value: SparseVector(10)()) > (0, value: SparseVector(10)()){code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org