[ https://issues.apache.org/jira/browse/SPARK-34517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
MiKou updated SPARK-34517: -------------------------- Summary: breeze.linalg.Vector.equals always return true may bring error. (was: Unexpected result when using 'aggregateMessages' with 'outerJoinVertices') > breeze.linalg.Vector.equals always return true may bring error. > --------------------------------------------------------------- > > Key: SPARK-34517 > URL: https://issues.apache.org/jira/browse/SPARK-34517 > Project: Spark > Issue Type: Bug > Components: GraphX > Affects Versions: 3.0.2 > Environment: OpenJDK 1.8.0_275 > Scala 2.12.10 > Hadoop 3.3.0 > Spark 3.0.2 > Reporter: MiKou > Priority: Major > > Using 'Graph.aggregateMessages' and 'Graph.outerJoinVertices' together when > implementing an iterative graph-algorithm which is very similar to > 'Graph.staticParallelPersonalizedPageRank'. > > There will be a bug if 'outerJoinVertices' does not change the data-type of > vertices. More specifically, 'aggregateMessages' gives the same results for > some iteration, while the value of vertices was still changed and the > attribute of a message is fully depend on the value of its source vertex. > Example code: > {code:java} > val prevRankGraph = rankGraph > val rankUpdates = prevRankGraph.aggregateMessages[Vector[Double]]( > ctx => { > println(ctx.srcId.nodeId, " pushing value: " + ctx.srcAttr._2) > ctx.sendToDst(ctx.srcAttr._2 *:* ctx.attr) > }, > (a : Vector[Double], b : Vector[Double]) => a +:+ b, > TripletFields.Src > ) > rankGraph = prevRankGraph.outerJoinVertices(rankUpdates) { case (vid, (deg, > rsd), msgSumOpt) => > println(vid.nodeId, " value: " + rsd.toString()) > (deg, msgSumOpt.getOrElse(zero) *:* (1.0 - alpha)) > }.cache() > {code} > > I temporarily solve the problem by changing the data-type when using > 'outerJoinVertices' and then change it back with 'mapVertices'. It showed > that the problem is probably due to the incorrect updating of activeSet when > 'outerJoinVertices' keeps the data-type of vertices. > The code work correctly: > {code:java} > rankGraph = prevRankGraph.outerJoinVertices(rankUpdates) { case (vid, > (deg, rsd), msgSumOpt) => > println(vid.nodeId, " value: " + rsd.toString()) > ((), (deg, msgSumOpt.getOrElse(zero) *:* (1.0 - alpha))) > }.mapVertices { case (_, (_, data)) => data } .cache() > {code} > > After test, the bug takes place because breeze.linalg.SparseVector.equals > will always return true if two vector have the same dimension > > An example dataset and logs of the issue are give below. > Example graph: > {code:java} > 0 6 > 1 0 > 1 5 > 1 6 > 2 0 > 2 7 > 3 2 > 3 6 > 4 7 > 5 1 > 6 8 > 7 5 > 7 9 > 8 0 > 9 6 > {code} > > Log of Executor1: > {code:java} > (0, pushing value: SparseVector(10)()) > (2, pushing value: SparseVector(10)()) > (2, pushing value: SparseVector(10)()) > (1, value: SparseVector(10)()) > (2, value: SparseVector(10)()) > (3, value: SparseVector(10)((3,1.0))) > (8, value: SparseVector(10)()) > (0, pushing value: SparseVector(10)()) > (2, pushing value: SparseVector(10)()) > (2, pushing value: SparseVector(10)()) > (1, value: SparseVector(10)()) > #On the vertex view, the value of node-2 is updated in the second round.# > (2, value: SparseVector(10)((3,0.4))) > #On the vertex view, node-3 has still no value in the second round.# > (3, value: SparseVector(10)()) > (8, value: SparseVector(10)()) > (0, pushing value: SparseVector(10)()) > #On the edge view, node-2 has no value in the third round.# > (2, pushing value: SparseVector(10)()) > (2, pushing value: SparseVector(10)()) > (1, value: SparseVector(10)()) > (2, value: SparseVector(10)((3,0.4))) > (3, value: SparseVector(10)()) > (8, value: SparseVector(10)()) > {code} > Log of Executor2: > {code:java} > (3, pushing value: SparseVector(10)((3,1.0))) > (3, pushing value: SparseVector(10)((3,1.0))) > (7, pushing value: SparseVector(10)()) > (7, pushing value: SparseVector(10)()) > (9, pushing value: SparseVector(10)()) > (6, value: SparseVector(10)()) > #On the edge view, the value of node-3 does not change in the second round.# > (3, pushing value: SparseVector(10)((3,1.0))) > (3, pushing value: SparseVector(10)((3,1.0))) > (7, pushing value: SparseVector(10)()) > (7, pushing value: SparseVector(10)()) > (9, pushing value: SparseVector(10)()) > (6, value: SparseVector(10)((3,0.4))) > #The same error also happends in the third round.# > (3, pushing value: SparseVector(10)((3,1.0))) > (3, pushing value: SparseVector(10)((3,1.0))) > (7, pushing value: SparseVector(10)()) > (7, pushing value: SparseVector(10)()) > (9, pushing value: SparseVector(10)()) > (6, value: SparseVector(10)((3,0.4))) > {code} > The log of executor3 and executor4 are omitted because they are similar to > above. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org