[ https://issues.apache.org/jira/browse/SPARK-34517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
MiKou updated SPARK-34517: -------------------------- Description: Using 'Graph.aggregateMessages' and 'Graph.outerJoinVertices' together when implementing an iterative graph-algorithm which is very similar to 'Graph.staticParallelPersonalizedPageRank'. There will be a bug if 'outerJoinVertices' does not change the data-type of vertices. More specifically, 'aggregateMessages' gives the same results for some iteration, while the value of vertices was still changed and the attribute of a message is fully depend on the value of its source vertex. I temporarily solve the problem by changing the data-type when using 'outerJoinVertices' and then change it back with 'mapVertices'. It showed that the problem is probably due to the incorrect updating of activeSet when 'outerJoinVertices' keeps the data-type of vertices. Example code: {code:java} val prevRankGraph = rankGraph val rankUpdates = prevRankGraph.aggregateMessages[Vector[Double]]( ctx => { println(ctx.srcId.nodeId, " pushing value: " + ctx.srcAttr._2) ctx.sendToDst(ctx.srcAttr._2 *:* ctx.attr) }, (a : Vector[Double], b : Vector[Double]) => a +:+ b, TripletFields.Src ) rankGraph = prevRankGraph.outerJoinVertices(rankUpdates) { case (vid, (deg, rsd), msgSumOpt) => println(vid.nodeId, " value: " + rsd.toString()) (deg, msgSumOpt.getOrElse(zero) *:* (1.0 - alpha)) }.cache() {code} Example graph: {code:java} 0 6 1 0 1 5 1 6 2 0 2 7 3 2 3 6 4 7 5 1 6 8 7 5 7 9 8 0 9 6 {code} Log of Executor1: {code:java} (1, pushing value: SparseVector(10)()) (1, pushing value: SparseVector(10)()) (1, pushing value: SparseVector(10)()) (5, pushing value: SparseVector(10)()) (1, value: SparseVector(10)()) (2, value: SparseVector(10)()) (3, value: SparseVector(10)((3,1.0))) (8, value: SparseVector(10)()) (1, pushing value: SparseVector(10)()) (1, pushing value: SparseVector(10)()) (1, pushing value: SparseVector(10)()) (5, pushing value: SparseVector(10)()) (1, value: SparseVector(10)()) (2, value: SparseVector(10)((3,0.4))) #On the vertex view, node-3 has still no value in the second round.# (3, value: SparseVector(10)()) (8, value: SparseVector(10)()) {code} Log of Executor2: {code:java} (3, pushing value: SparseVector(10)((3,1.0))) (3, pushing value: SparseVector(10)((3,1.0))) (7, pushing value: SparseVector(10)()) (7, pushing value: SparseVector(10)()) (9, pushing value: SparseVector(10)()) (9, value: SparseVector(10)()) (0, value: SparseVector(10)()) #On the edge view, the value of node-3 does not change in the second round.# (3, pushing value: SparseVector(10)((3,1.0))) (3, pushing value: SparseVector(10)((3,1.0))) (7, pushing value: SparseVector(10)()) (7, pushing value: SparseVector(10)()) (9, pushing value: SparseVector(10)()) (9, value: SparseVector(10)()) (0, value: SparseVector(10)()) (3, pushing value: SparseVector(10)()) (3, pushing value: SparseVector(10)()) (7, pushing value: SparseVector(10)((3,0.16000000000000003))) (7, pushing value: SparseVector(10)((3,0.16000000000000003))) (9, pushing value: SparseVector(10)()) (6, value: SparseVector(10)()){code} Log of Executor3: {code:java} (0, pushing value: SparseVector(10)()) (2, pushing value: SparseVector(10)()) (2, pushing value: SparseVector(10)()) (9, value: SparseVector(10)()) (0, value: SparseVector(10)()) (0, pushing value: SparseVector(10)()) #The value of node-2 is updated correctly on the edge view.# (2, pushing value: SparseVector(10)((3,0.4))) (2, pushing value: SparseVector(10)((3,0.4))) (9, value: SparseVector(10)()) (0, value: SparseVector(10)()) (0, pushing value: SparseVector(10)((3,0.16000000000000003))) #The value of node-2 also updated correctly in the third round.# (2, pushing value: SparseVector(10)()) (2, pushing value: SparseVector(10)()) (9, value: SparseVector(10)()) (0, value: SparseVector(10)((3,0.16000000000000003))) {code} Log of Executor4: {code:java} (4, pushing value: SparseVector(10)()) (6, pushing value: SparseVector(10)()) (8, pushing value: SparseVector(10)()) (4, value: SparseVector(10)()) (7, value: SparseVector(10)()) (5, value: SparseVector(10)()) (4, pushing value: SparseVector(10)()) #The value of node-6 is updated correctly on the edge view.# (6, pushing value: SparseVector(10)((3,0.4))) (8, pushing value: SparseVector(10)()) (4, value: SparseVector(10)()) (7, value: SparseVector(10)()) (5, value: SparseVector(10)()) (4, pushing value: SparseVector(10)()) #The value of node-6 also updated correctly in the third round.# (6, pushing value: SparseVector(10)()) (8, pushing value: SparseVector(10)((3,0.32000000000000006))) (4, value: SparseVector(10)()) (7, value: SparseVector(10)((3,0.16000000000000003))) (5, value: SparseVector(10)()) {code} was: Using 'Graph.aggregateMessages' and 'Graph.outerJoinVertices' together when implementing an iterative graph-algorithm which is very similar to 'Graph.staticParallelPersonalizedPageRank'. There will be a bug if 'outerJoinVertices' does not change the data-type of vertices. More specifically, 'aggregateMessages' gives the same results for some iteration, while the value of vertices was still changed and the attribute of a message is fully depend on the value of its source vertex. I temporarily solve the problem by changing the data-type when using 'outerJoinVertices' and then change it back with 'mapVertices'. It showed that the problem is probably due to the incorrect updating of activeSet when 'outerJoinVertices' keeps the data-type of vertices. Example code: {code:java} val prevRankGraph = rankGraph val rankUpdates = prevRankGraph.aggregateMessages[Vector[Double]]( ctx => { println(ctx.srcId.nodeId, " pushing value: " + ctx.srcAttr._2) ctx.sendToDst(ctx.srcAttr._2 *:* ctx.attr) }, (a : Vector[Double], b : Vector[Double]) => a +:+ b, TripletFields.Src ) rankGraph = prevRankGraph.outerJoinVertices(rankUpdates) { case (vid, (deg, rsd), msgSumOpt) => println(vid.nodeId, " value: " + rsd.toString()) (deg, msgSumOpt.getOrElse(zero) *:* (1.0 - alpha)) }.cache() {code} Example graph: {code:java} 0 6 1 0 1 5 1 6 2 0 2 7 3 2 3 6 4 7 5 1 6 8 7 5 7 9 8 0 9 6 {code} Log of Executor1: {code:java} (1, pushing value: SparseVector(10)()) (1, pushing value: SparseVector(10)()) (1, pushing value: SparseVector(10)()) (5, pushing value: SparseVector(10)()) (1, value: SparseVector(10)()) (2, value: SparseVector(10)()) (3, value: SparseVector(10)((3,1.0))) (8, value: SparseVector(10)()) (1, pushing value: SparseVector(10)()) (1, pushing value: SparseVector(10)()) (1, pushing value: SparseVector(10)()) (5, pushing value: SparseVector(10)()) (1, value: SparseVector(10)()) (2, value: SparseVector(10)((3,0.4))) #On the vertex view, node-3 has still no value in the second round.# (3, value: SparseVector(10)()) (8, value: SparseVector(10)()) {code} Log of Executor2: {code:java} (3, pushing value: SparseVector(10)((3,1.0))) (3, pushing value: SparseVector(10)((3,1.0))) (7, pushing value: SparseVector(10)()) (7, pushing value: SparseVector(10)()) (9, pushing value: SparseVector(10)()) (9, value: SparseVector(10)()) (0, value: SparseVector(10)()) #On the edge view, the value of node-3 does not change in the second round.# (3, pushing value: SparseVector(10)((3,1.0))) (3, pushing value: SparseVector(10)((3,1.0))) (7, pushing value: SparseVector(10)()) (7, pushing value: SparseVector(10)()) (9, pushing value: SparseVector(10)()) (9, value: SparseVector(10)()) (0, value: SparseVector(10)()){code} > Unexpected result when using 'aggregateMessages' with 'outerJoinVertices' > ------------------------------------------------------------------------- > > Key: SPARK-34517 > URL: https://issues.apache.org/jira/browse/SPARK-34517 > Project: Spark > Issue Type: Bug > Components: GraphX > Affects Versions: 3.0.2 > Environment: OpenJDK 1.8.0_275 > Scala 2.12.10 > Hadoop 3.3.0 > Spark 3.0.2 > Reporter: MiKou > Priority: Critical > > Using 'Graph.aggregateMessages' and 'Graph.outerJoinVertices' together when > implementing an iterative graph-algorithm which is very similar to > 'Graph.staticParallelPersonalizedPageRank'. > > There will be a bug if 'outerJoinVertices' does not change the data-type of > vertices. More specifically, 'aggregateMessages' gives the same results for > some iteration, while the value of vertices was still changed and the > attribute of a message is fully depend on the value of its source vertex. > > I temporarily solve the problem by changing the data-type when using > 'outerJoinVertices' and then change it back with 'mapVertices'. It showed > that the problem is probably due to the incorrect updating of activeSet when > 'outerJoinVertices' keeps the data-type of vertices. > > Example code: > {code:java} > val prevRankGraph = rankGraph > val rankUpdates = prevRankGraph.aggregateMessages[Vector[Double]]( > ctx => { > println(ctx.srcId.nodeId, " pushing value: " + ctx.srcAttr._2) > ctx.sendToDst(ctx.srcAttr._2 *:* ctx.attr) > }, > (a : Vector[Double], b : Vector[Double]) => a +:+ b, > TripletFields.Src > ) > rankGraph = prevRankGraph.outerJoinVertices(rankUpdates) { case (vid, (deg, > rsd), msgSumOpt) => > println(vid.nodeId, " value: " + rsd.toString()) > (deg, msgSumOpt.getOrElse(zero) *:* (1.0 - alpha)) > }.cache() > {code} > > Example graph: > {code:java} > 0 6 > 1 0 > 1 5 > 1 6 > 2 0 > 2 7 > 3 2 > 3 6 > 4 7 > 5 1 > 6 8 > 7 5 > 7 9 > 8 0 > 9 6 > {code} > > Log of Executor1: > {code:java} > (1, pushing value: SparseVector(10)()) > (1, pushing value: SparseVector(10)()) > (1, pushing value: SparseVector(10)()) > (5, pushing value: SparseVector(10)()) > (1, value: SparseVector(10)()) > (2, value: SparseVector(10)()) > (3, value: SparseVector(10)((3,1.0))) > (8, value: SparseVector(10)()) > (1, pushing value: SparseVector(10)()) > (1, pushing value: SparseVector(10)()) > (1, pushing value: SparseVector(10)()) > (5, pushing value: SparseVector(10)()) > (1, value: SparseVector(10)()) > (2, value: SparseVector(10)((3,0.4))) > #On the vertex view, node-3 has still no value in the second round.# > (3, value: SparseVector(10)()) > (8, value: SparseVector(10)()) > {code} > Log of Executor2: > {code:java} > (3, pushing value: SparseVector(10)((3,1.0))) > (3, pushing value: SparseVector(10)((3,1.0))) > (7, pushing value: SparseVector(10)()) > (7, pushing value: SparseVector(10)()) > (9, pushing value: SparseVector(10)()) > (9, value: SparseVector(10)()) > (0, value: SparseVector(10)()) > #On the edge view, the value of node-3 does not change in the second round.# > (3, pushing value: SparseVector(10)((3,1.0))) > (3, pushing value: SparseVector(10)((3,1.0))) > (7, pushing value: SparseVector(10)()) > (7, pushing value: SparseVector(10)()) > (9, pushing value: SparseVector(10)()) > (9, value: SparseVector(10)()) > (0, value: SparseVector(10)()) > (3, pushing value: SparseVector(10)()) > (3, pushing value: SparseVector(10)()) > (7, pushing value: SparseVector(10)((3,0.16000000000000003))) > (7, pushing value: SparseVector(10)((3,0.16000000000000003))) > (9, pushing value: SparseVector(10)()) > (6, value: SparseVector(10)()){code} > Log of Executor3: > > {code:java} > (0, pushing value: SparseVector(10)()) > (2, pushing value: SparseVector(10)()) > (2, pushing value: SparseVector(10)()) > (9, value: SparseVector(10)()) > (0, value: SparseVector(10)()) > (0, pushing value: SparseVector(10)()) > #The value of node-2 is updated correctly on the edge view.# > (2, pushing value: SparseVector(10)((3,0.4))) > (2, pushing value: SparseVector(10)((3,0.4))) > (9, value: SparseVector(10)()) > (0, value: SparseVector(10)()) > (0, pushing value: SparseVector(10)((3,0.16000000000000003))) > #The value of node-2 also updated correctly in the third round.# > (2, pushing value: SparseVector(10)()) > (2, pushing value: SparseVector(10)()) > (9, value: SparseVector(10)()) > (0, value: SparseVector(10)((3,0.16000000000000003))) > {code} > Log of Executor4: > > {code:java} > (4, pushing value: SparseVector(10)()) > (6, pushing value: SparseVector(10)()) > (8, pushing value: SparseVector(10)()) > (4, value: SparseVector(10)()) > (7, value: SparseVector(10)()) > (5, value: SparseVector(10)()) > (4, pushing value: SparseVector(10)()) > #The value of node-6 is updated correctly on the edge view.# > (6, pushing value: SparseVector(10)((3,0.4))) > (8, pushing value: SparseVector(10)()) > (4, value: SparseVector(10)()) > (7, value: SparseVector(10)()) > (5, value: SparseVector(10)()) > (4, pushing value: SparseVector(10)()) > #The value of node-6 also updated correctly in the third round.# > (6, pushing value: SparseVector(10)()) > (8, pushing value: SparseVector(10)((3,0.32000000000000006))) > (4, value: SparseVector(10)()) > (7, value: SparseVector(10)((3,0.16000000000000003))) > (5, value: SparseVector(10)()) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org