Hi im currently using graphx for some analysis and have come into a bit of a
hurdle. If use my test dataset of 20 nodes and about 30 links it runs really
quickly. I have two other data sets i use one of 10million links and one of
20 million. When i create my graphs seems to work okay and i can get a count
of the vertices in around 10 minutes (12 node cluster with 192 cores and
256Gb RAM on each node). Code below for this part:
import org.apache.spark.graphx._
import java.util.HashMap
import scala.collection.JavaConversions._
import org.apache.spark.rdd.RDD

//create the source graph from hdfs
case class Entity(Active: String, WA_Ind: String, Ent_Typ: String, Mkt_Seg:
String, Status: String)

//read the vertex information from HDFS
val vRDD: RDD[(Long,(Entity))] =
sc.parallelize(sc.textFile("hdfs://Data/entities_2011.csv",1000).
map(x=>x.split(",")).
map(x=>(x(0).toLong,new Entity(x(1),x(2),x(3),x(4),"A"))).
collect)

//read the edge information from hdfs
val eRDD = sc.parallelize((sc.textFile("hdfs://Data/links_2011.csv",1000).
map(x=>(Edge(x.split(",")(0).toInt,x.split(",")(1).toInt,1))).collect))

val sourceGraph: Graph[(Entity),Int] = Graph(vRDD,eRDD).cache()

type DistanceMap = HashMap[(VertexId,String), Int]

//create new class and place holders for information
case class Node(Entity: Entity, Parent: Int, inDeg: Int, outDeg:
Int,Distance:DistanceMap)
//val sourceGraph2: Graph [Node,Int] = sourceGraph.mapVertices{case
(id,(entity)) => Node(entity,0,0,0,new DistanceMap)}
val sourceGraph2  = sourceGraph.mapVertices{case (id,(entity)) =>
Node(entity,0,0,0,new DistanceMap)}


the problem is though when i use pregel to do some work in the graph it
never seems to finish. I can do the really small set (20 vertices) fine, the
10 million works but the 20 million never finishes. Code below. Any
assistance appreciated

//create new class and place holders for information
case class Node(Entity: Entity, Parent: Int, inDeg: Int, outDeg:
Int,Distance:DistanceMap)
//val sourceGraph2: Graph [Node,Int] = sourceGraph.mapVertices{case
(id,(entity)) => Node(entity,0,0,0,new DistanceMap)}
val sourceGraph2  = sourceGraph.mapVertices{case (id,(entity)) =>
Node(entity,0,0,0,new DistanceMap)}
// distance map to hold network root and node level
// updated the pregel functions to include nodes market structure
// distance map was HashMap[VertexId, Int]
// and changed 
// initMap.put((vid), 0)
// to
// initMap.put((vid,prevAttr.Entity.Mkt_Seg), 0)

type DistanceMap = HashMap[(VertexId,String), Int]

val initDists: Graph[DistanceMap, Int] =
sourceGraph2.outerJoinVertices(sourceGraph2.inDegrees) {
(vid, prevAttr, inDeg) =>
val initMap = new DistanceMap
if (inDeg.getOrElse(0) == 0) {
initMap.put((vid,prevAttr.Entity.Mkt_Seg), 0)
}
initMap
}.cache()

def sendMsg(edge: EdgeTriplet[DistanceMap, Int]): Iterator[(VertexId,
DistanceMap)] = {
val updatedDists = new DistanceMap
edge.srcAttr.foreach { case (source, dist) =>
if (!edge.dstAttr.containsKey(source) || edge.dstAttr.get(source) > dist +
1) {
updatedDists.put(source, dist + 1)
}
}
if (!updatedDists.isEmpty) {
Iterator((edge.dstId, updatedDists))
} else {
Iterator.empty
}
}

def mergeMsg(a: DistanceMap, b: DistanceMap): DistanceMap = {
val merged = new DistanceMap(a)
b.foreach { case (source, dist) =>
if (merged.containsKey(source)) {
merged.put(source, math.min(merged.get(source), dist))
} else {
merged.put(source, dist)
}
}
merged
}
def vprog(vid: VertexId, curDists: DistanceMap, newDists: DistanceMap):
DistanceMap = {
mergeMsg(curDists, newDists)
}
val dists = initDists.pregel[DistanceMap](new DistanceMap)(vprog, sendMsg,
mergeMsg)







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/graphx-running-time-tp22398.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to