[ https://issues.apache.org/jira/browse/SPARK-18916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15760630#comment-15760630 ]
Sean Owen commented on SPARK-18916: ----------------------------------- OK, is this then just a Scala problem to be fixed in 2.11.9 / 2.12? > Possible bug in Pregel / mergeMsg with hashmaps > ----------------------------------------------- > > Key: SPARK-18916 > URL: https://issues.apache.org/jira/browse/SPARK-18916 > Project: Spark > Issue Type: Bug > Components: GraphX > Affects Versions: 2.0.2 > Environment: OSX / IntelliJ IDEA 2016.3 CE EAP, Scala 2.11.8, Spark > 2.0.2 > Reporter: Seth Bromberger > Labels: error, graphx, pregel > > Consider the following (rough) code that attempts to calculate all-pairs > shortest paths via pregel: > {code:java} > def allPairsShortestPaths: RDD[(VertexId, HashMap[VertexId, ParentDist])] > = { > val initialMsg = HashMap(-1L -> ParentDist(-1L, -1L)) > val pregelg = g.mapVertices((vid, vd) => (vd, HashMap[VertexId, > ParentDist](vid -> ParentDist(vid, 0L)))).reverse > def vprog(v: VertexId, value: (VD, HashMap[VertexId, ParentDist]), > message: HashMap[VertexId, ParentDist]): (VD, HashMap[VertexId, ParentDist]) > = { > val updatedValues = mm2(value._2, message).filter(v => v._2.dist >= 0) > (value._1, updatedValues) > } > def sendMsg(triplet: EdgeTriplet[(VD, HashMap[VertexId, ParentDist]), > ED]): Iterator[(VertexId, HashMap[VertexId, ParentDist])] = { > val dstVertexId = triplet.dstId > val srcMap = triplet.srcAttr._2 > val dstMap = triplet.dstAttr._2 // guaranteed to have dstVertexId as > a key > val updatesToSend : HashMap[VertexId, ParentDist] = srcMap.filter { > case (vid, srcPD) => dstMap.get(vid) match { > case Some(dstPD) => dstPD.dist > srcPD.dist + 1 // if it > exists, is it cheaper? > case _ => true // not found - new update > } > }.map(u => u._1 -> ParentDist(triplet.srcId, u._2.dist +1)) > if (updatesToSend.nonEmpty) > Iterator[(VertexId, HashMap[VertexId, ParentDist])]((dstVertexId, > updatesToSend)) > else > Iterator.empty > } > def mergeMsg(m1: HashMap[VertexId, ParentDist], m2: HashMap[VertexId, > ParentDist]): HashMap[VertexId, ParentDist] = { > // when the following two lines are commented out, the program fails > with > // 16/12/17 19:53:50 INFO DAGScheduler: Job 24 failed: reduce at > VertexRDDImpl.scala:88, took 0.244042 s > // Exception in thread "main" org.apache.spark.SparkException: Job > aborted due to stage failure: Task 0 in stage 1099.0 failed 1 times, most > recent failure: Lost task 0.0 in stage 1099.0 (TID 129, localhost): > scala.MatchError: (null,null) (of class scala.Tuple2) > m1.foreach(_ => ()) > m2.foreach(_ => ()) > m1.merged(m2) { > case ((k1, v1), (_, v2)) => (k1, v1.min(v2)) > } > } > // mm2 is here just to provide a separate function for vprog. Ideally > we'd just re-use mergeMsg. > def mm2(m1: HashMap[VertexId, ParentDist], m2: HashMap[VertexId, > ParentDist]): HashMap[VertexId, ParentDist] = { > m1.merged(m2) { > case ((k1, v1), (_, v2)) => (k1, v1.min(v2)) > case n => throw new Exception("we've got a problem: " + n) > } > } > val pregelRun = pregelg.pregel(initialMsg)(vprog, sendMsg, mergeMsg) > val sps = pregelRun.vertices.map(v => v._1 -> v._2._2) > sps > } > } > {code} > Note the comment in the mergeMsg function: when the messages are explicitly > accessed prior to the .merged statement, the code works. If these side-effect > statements are removed / commented out, the error message in the comments is > generated. > This fails consistently on a 50-node undirected cyclegraph. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org