Seth Bromberger created SPARK-17097:
---------------------------------------

             Summary: Pregel does not keep vertex state properly; fails to 
terminate 
                 Key: SPARK-17097
                 URL: https://issues.apache.org/jira/browse/SPARK-17097
             Project: Spark
          Issue Type: Bug
          Components: GraphX
    Affects Versions: 1.6.0
         Environment: Scala 2.10.5, Spark 1.6.0 with GraphX and Pregel
            Reporter: Seth Bromberger


Consider the following minimum example:
{code:title=PregelBug.scala|borderStyle=solid}
package testGraph

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx.{Edge, EdgeTriplet, Graph, _}

object PregelBug {
  def main(args: Array[String]) = {
    //FIXME breaks if TestVertex is a case class; works if not case class
    case class TestVertex(inId: VertexId,
                     inData: String,
                     inLabels: collection.mutable.HashSet[String]) extends 
Serializable {
      val id = inId
      val value = inData
      val labels = inLabels
    }

    class TestLink(inSrc: VertexId, inDst: VertexId, inData: String) extends 
Serializable  {
      val src = inSrc
      val dst = inDst
      val data = inData
    }

    val startString = "XXXSTARTXXX"

    val conf = new SparkConf().setAppName("pregeltest").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val vertexes = Vector(
      new TestVertex(0, "label0", collection.mutable.HashSet[String]()),
      new TestVertex(1, "label1", collection.mutable.HashSet[String]())
    )
    val links = Vector(
      new TestLink(0, 1, "linkData01")
    )
    val vertexes_packaged = vertexes.map(v => (v.id, v))
    val links_packaged = links.map(e => Edge(e.src, e.dst, e))

    val graph = Graph[TestVertex, TestLink](sc.parallelize(vertexes_packaged), 
sc.parallelize(links_packaged))

    def vertexProgram (vertexId: VertexId, vdata: TestVertex, message: 
Vector[String]): TestVertex = {
      message.foreach {
        case `startString` =>
          if (vdata.id == 0L)
            vdata.labels.add(vdata.value)

        case m =>
          if (!vdata.labels.contains(m))
            vdata.labels.add(m)
      }
      new TestVertex(vdata.id, vdata.value, vdata.labels)
    }

    def sendMessage (triplet: EdgeTriplet[TestVertex, TestLink]): 
Iterator[(VertexId, Vector[String])] = {
      val srcLabels = triplet.srcAttr.labels
      val dstLabels = triplet.dstAttr.labels

      val msgsSrcDst = srcLabels.diff(dstLabels)
        .map(label => (triplet.dstAttr.id, Vector[String](label)))

      val msgsDstSrc = dstLabels.diff(dstLabels)
        .map(label => (triplet.srcAttr.id, Vector[String](label)))

      msgsSrcDst.toIterator ++ msgsDstSrc.toIterator
    }

    def mergeMessage (m1: Vector[String], m2: Vector[String]): Vector[String] = 
m1.union(m2).distinct

    val g = graph.pregel(Vector[String](startString))(vertexProgram, 
sendMessage, mergeMessage)

    println("---pregel done---")
    println("vertex info:")
    g.vertices.foreach(
      v => {
        val labels = v._2.labels
        println(
          "vertex " + v._1 +
            ": name = " + v._2.id +
            ", labels = " + labels)
      }
    )
  }
}
{code}


This code never terminates even though we expect it to. To fix, we simply 
remove the "case" designation for the TestVertex class (see FIXME comment), and 
then it behaves as expected.

(Apologies if this has been fixed in later versions; we're unfortunately pegged 
to 2.10.5 / 1.6.0 for now.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to