Dear list,
Hi~I am new to spark and graphx, and I have a few experiences using scala. I 
want to use graphx to calculate some basic statistics in linked open data, 
which is basically a graph. 
Suppose the graph only contains one type of edge, directing from individuals to 
concepts, and the edge labels are all "type". I want to find all pairs of 
concepts that have at least one individual linking to both of them.The 
following is my current solution, but sadly doesn't work.
Could you please help me work this out? Or are there better solutions? Any 
helps are appreciated! 

  val conf = new 
SparkConf().setMaster("spark://MacBook-Pro:7077").setAppName("My 
App").setJars(...)  val sc = new SparkContext(conf)// initialize individuals 
(in small letters) and concepts (in upper case letters)  val users: 
RDD[(org.apache.spark.graphx.VertexId, String)] =    sc.parallelize(Array((1L, 
"a"), (2L, "b"),      (3L, "e"), (11L, "A"), (12L, "B"), (13L, "C"), (14L, 
"D")))  // initialize "type" edges  val relationships: RDD[Edge[String]] =    
sc.parallelize(Array(      Edge(1L, 11L, "type"), Edge(1L, 14L, "type"), 
Edge(1L, 13L, "type"),      Edge(2L, 11L, "type"), Edge(2L, 12L, "type"),      
Edge(3L, 11L, "type"), Edge(3L, 13L, "type")))   val graph = Graph(users, 
relationships)  val indTypes = graph.collectNeighborIds(EdgeDirection.Out)
// seems to be stupid functions...  def mapUDF(triplet: EdgeContext[String, 
String, HashMap[Long, Int]]) = {    val msg = indTypes.filter(pred => pred._1 
== triplet.srcId).first()._2.aggregate(new HashMap[Long, Int])((a, b) => a.+=(b 
-> 1), (a, b) => a ++ b)    triplet.sendToDst(msg)  }  def reduceUDF(a: 
HashMap[Long, Int], b: HashMap[Long, Int]): HashMap[Long, Int] = a ++ b.map { 
case (k, v) => k -> (v + a.getOrElse(k, 0)) }    var pairs = new HashMap[(Long, 
Long), Int]  val results = graph.aggregateMessages[HashMap[Long, Int]](    
mapUDF, reduceUDF)
  results.foreach(result => {      result._2.filter(p => p._1 != 
result._1).foreach(map => {        val a = result._1        val b = map._1      
  if (!pairs.contains(a, b) && !pairs.contains(b, a))          pairs += (a, b) 
-> map._2      })    })  pairs.foreach(println(_))
The exceptions:
TaskSetManager: Lost task 1.0 in stage 3.0 (TID 7, 10.42.0.17): 
java.lang.NullPointerExceptionat 
atur.node.learner.Test$.mapUDF(SimpleDisjointnessLearner.scala:147)at 
atur.node.learner.Test$$anonfun$4.apply(SimpleDisjointnessLearner.scala:155)

best wishes,June

Reply via email to