[ 
https://issues.apache.org/jira/browse/SPARK-18916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15759870#comment-15759870
 ] 

Seth Bromberger commented on SPARK-18916:
-----------------------------------------

Added to update: https://issues.scala-lang.org/browse/SI-9895 appears to be 
what hit me.

> Possible bug in Pregel / mergeMsg with hashmaps
> -----------------------------------------------
>
>                 Key: SPARK-18916
>                 URL: https://issues.apache.org/jira/browse/SPARK-18916
>             Project: Spark
>          Issue Type: Bug
>          Components: GraphX
>    Affects Versions: 2.0.2
>         Environment: OSX / IntelliJ IDEA 2016.3 CE EAP, Scala 2.11.8, Spark 
> 2.0.2
>            Reporter: Seth Bromberger
>              Labels: error, graphx, pregel
>
> Consider the following (rough) code that attempts to calculate all-pairs 
> shortest paths via pregel:
> {code:java}
>     def allPairsShortestPaths: RDD[(VertexId, HashMap[VertexId, ParentDist])] 
> = {
>       val initialMsg = HashMap(-1L -> ParentDist(-1L, -1L))
>       val pregelg = g.mapVertices((vid, vd) => (vd, HashMap[VertexId, 
> ParentDist](vid -> ParentDist(vid, 0L)))).reverse
>       def vprog(v: VertexId, value: (VD, HashMap[VertexId, ParentDist]), 
> message: HashMap[VertexId, ParentDist]): (VD, HashMap[VertexId, ParentDist]) 
> = {
>         val updatedValues = mm2(value._2, message).filter(v => v._2.dist >= 0)
>         (value._1, updatedValues)
>       }
>       def sendMsg(triplet: EdgeTriplet[(VD, HashMap[VertexId, ParentDist]), 
> ED]): Iterator[(VertexId, HashMap[VertexId, ParentDist])] = {
>         val dstVertexId = triplet.dstId
>         val srcMap = triplet.srcAttr._2
>         val dstMap = triplet.dstAttr._2  // guaranteed to have dstVertexId as 
> a key
>         val updatesToSend : HashMap[VertexId, ParentDist] = srcMap.filter {
>           case (vid, srcPD) => dstMap.get(vid) match {
>             case Some(dstPD) => dstPD.dist > srcPD.dist + 1   // if it 
> exists, is it cheaper?
>             case _ => true // not found - new update
>           }
>         }.map(u => u._1 -> ParentDist(triplet.srcId, u._2.dist +1))
>         if (updatesToSend.nonEmpty)
>           Iterator[(VertexId, HashMap[VertexId, ParentDist])]((dstVertexId, 
> updatesToSend))
>         else
>           Iterator.empty
>       }
>       def mergeMsg(m1: HashMap[VertexId, ParentDist], m2: HashMap[VertexId, 
> ParentDist]): HashMap[VertexId, ParentDist] = {
>         // when the following two lines are commented out, the program fails 
> with
>         // 16/12/17 19:53:50 INFO DAGScheduler: Job 24 failed: reduce at 
> VertexRDDImpl.scala:88, took 0.244042 s
>         // Exception in thread "main" org.apache.spark.SparkException: Job 
> aborted due to stage failure: Task 0 in stage 1099.0 failed 1 times, most 
> recent failure: Lost task 0.0 in stage 1099.0 (TID 129, localhost): 
> scala.MatchError: (null,null) (of class scala.Tuple2)
>         m1.foreach(_ => ())
>         m2.foreach(_ => ())
>         m1.merged(m2) {
>           case ((k1, v1), (_, v2)) => (k1, v1.min(v2))
>         }
>       }
>       // mm2 is here just to provide a separate function for vprog. Ideally 
> we'd just re-use mergeMsg.
>       def mm2(m1: HashMap[VertexId, ParentDist], m2: HashMap[VertexId, 
> ParentDist]): HashMap[VertexId, ParentDist] = {
>         m1.merged(m2) {
>           case ((k1, v1), (_, v2)) => (k1, v1.min(v2))
>           case n => throw new Exception("we've got a problem: " + n)
>         }
>       }
>       val pregelRun = pregelg.pregel(initialMsg)(vprog, sendMsg, mergeMsg)
>       val sps = pregelRun.vertices.map(v => v._1 -> v._2._2)
>       sps
>     }
>   }
> {code}
> Note the comment in the mergeMsg function: when the messages are explicitly 
> accessed prior to the .merged statement, the code works. If these side-effect 
> statements are removed / commented out, the error message in the comments is 
> generated.
> This fails consistently on a 50-node undirected cyclegraph.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to