Dear list, Hi~I am new to spark and graphx, and I have a few experiences using scala. I want to use graphx to calculate some basic statistics in linked open data, which is basically a graph. Suppose the graph only contains one type of edge, directing from individuals to concepts, and the edge labels are all "type". I want to find all pairs of concepts that have at least one individual linking to both of them.The following is my current solution, but sadly doesn't work. Could you please help me work this out? Or are there better solutions? Any helps are appreciated!
val conf = new SparkConf().setMaster("spark://MacBook-Pro:7077").setAppName("My App").setJars(...) val sc = new SparkContext(conf)// initialize individuals (in small letters) and concepts (in upper case letters) val users: RDD[(org.apache.spark.graphx.VertexId, String)] = sc.parallelize(Array((1L, "a"), (2L, "b"), (3L, "e"), (11L, "A"), (12L, "B"), (13L, "C"), (14L, "D"))) // initialize "type" edges val relationships: RDD[Edge[String]] = sc.parallelize(Array( Edge(1L, 11L, "type"), Edge(1L, 14L, "type"), Edge(1L, 13L, "type"), Edge(2L, 11L, "type"), Edge(2L, 12L, "type"), Edge(3L, 11L, "type"), Edge(3L, 13L, "type"))) val graph = Graph(users, relationships) val indTypes = graph.collectNeighborIds(EdgeDirection.Out) // seems to be stupid functions... def mapUDF(triplet: EdgeContext[String, String, HashMap[Long, Int]]) = { val msg = indTypes.filter(pred => pred._1 == triplet.srcId).first()._2.aggregate(new HashMap[Long, Int])((a, b) => a.+=(b -> 1), (a, b) => a ++ b) triplet.sendToDst(msg) } def reduceUDF(a: HashMap[Long, Int], b: HashMap[Long, Int]): HashMap[Long, Int] = a ++ b.map { case (k, v) => k -> (v + a.getOrElse(k, 0)) } var pairs = new HashMap[(Long, Long), Int] val results = graph.aggregateMessages[HashMap[Long, Int]]( mapUDF, reduceUDF) results.foreach(result => { result._2.filter(p => p._1 != result._1).foreach(map => { val a = result._1 val b = map._1 if (!pairs.contains(a, b) && !pairs.contains(b, a)) pairs += (a, b) -> map._2 }) }) pairs.foreach(println(_)) The exceptions: TaskSetManager: Lost task 1.0 in stage 3.0 (TID 7, 10.42.0.17): java.lang.NullPointerExceptionat atur.node.learner.Test$.mapUDF(SimpleDisjointnessLearner.scala:147)at atur.node.learner.Test$$anonfun$4.apply(SimpleDisjointnessLearner.scala:155) best wishes,June