Hi im currently using graphx for some analysis and have come into a bit of a hurdle. If use my test dataset of 20 nodes and about 30 links it runs really quickly. I have two other data sets i use one of 10million links and one of 20 million. When i create my graphs seems to work okay and i can get a count of the vertices in around 10 minutes (12 node cluster with 192 cores and 256Gb RAM on each node). Code below for this part: import org.apache.spark.graphx._ import java.util.HashMap import scala.collection.JavaConversions._ import org.apache.spark.rdd.RDD
//create the source graph from hdfs case class Entity(Active: String, WA_Ind: String, Ent_Typ: String, Mkt_Seg: String, Status: String) //read the vertex information from HDFS val vRDD: RDD[(Long,(Entity))] = sc.parallelize(sc.textFile("hdfs://Data/entities_2011.csv",1000). map(x=>x.split(",")). map(x=>(x(0).toLong,new Entity(x(1),x(2),x(3),x(4),"A"))). collect) //read the edge information from hdfs val eRDD = sc.parallelize((sc.textFile("hdfs://Data/links_2011.csv",1000). map(x=>(Edge(x.split(",")(0).toInt,x.split(",")(1).toInt,1))).collect)) val sourceGraph: Graph[(Entity),Int] = Graph(vRDD,eRDD).cache() type DistanceMap = HashMap[(VertexId,String), Int] //create new class and place holders for information case class Node(Entity: Entity, Parent: Int, inDeg: Int, outDeg: Int,Distance:DistanceMap) //val sourceGraph2: Graph [Node,Int] = sourceGraph.mapVertices{case (id,(entity)) => Node(entity,0,0,0,new DistanceMap)} val sourceGraph2 = sourceGraph.mapVertices{case (id,(entity)) => Node(entity,0,0,0,new DistanceMap)} the problem is though when i use pregel to do some work in the graph it never seems to finish. I can do the really small set (20 vertices) fine, the 10 million works but the 20 million never finishes. Code below. Any assistance appreciated //create new class and place holders for information case class Node(Entity: Entity, Parent: Int, inDeg: Int, outDeg: Int,Distance:DistanceMap) //val sourceGraph2: Graph [Node,Int] = sourceGraph.mapVertices{case (id,(entity)) => Node(entity,0,0,0,new DistanceMap)} val sourceGraph2 = sourceGraph.mapVertices{case (id,(entity)) => Node(entity,0,0,0,new DistanceMap)} // distance map to hold network root and node level // updated the pregel functions to include nodes market structure // distance map was HashMap[VertexId, Int] // and changed // initMap.put((vid), 0) // to // initMap.put((vid,prevAttr.Entity.Mkt_Seg), 0) type DistanceMap = HashMap[(VertexId,String), Int] val initDists: Graph[DistanceMap, Int] = sourceGraph2.outerJoinVertices(sourceGraph2.inDegrees) { (vid, prevAttr, inDeg) => val initMap = new DistanceMap if (inDeg.getOrElse(0) == 0) { initMap.put((vid,prevAttr.Entity.Mkt_Seg), 0) } initMap }.cache() def sendMsg(edge: EdgeTriplet[DistanceMap, Int]): Iterator[(VertexId, DistanceMap)] = { val updatedDists = new DistanceMap edge.srcAttr.foreach { case (source, dist) => if (!edge.dstAttr.containsKey(source) || edge.dstAttr.get(source) > dist + 1) { updatedDists.put(source, dist + 1) } } if (!updatedDists.isEmpty) { Iterator((edge.dstId, updatedDists)) } else { Iterator.empty } } def mergeMsg(a: DistanceMap, b: DistanceMap): DistanceMap = { val merged = new DistanceMap(a) b.foreach { case (source, dist) => if (merged.containsKey(source)) { merged.put(source, math.min(merged.get(source), dist)) } else { merged.put(source, dist) } } merged } def vprog(vid: VertexId, curDists: DistanceMap, newDists: DistanceMap): DistanceMap = { mergeMsg(curDists, newDists) } val dists = initDists.pregel[DistanceMap](new DistanceMap)(vprog, sendMsg, mergeMsg) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/graphx-running-time-tp22398.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org