Dear list,
Hi~I am new to spark and graphx, and I have a few experiences using scala. I
want to use graphx to calculate some basic statistics in linked open data,
which is basically a graph.
Suppose the graph only contains one type of edge, directing from individuals to
concepts, and the edge labels are all type. I want to find all pairs of
concepts that have at least one individual linking to both of them.The
following is my current solution, but sadly doesn't work.
Could you please help me work this out? Or are there better solutions? Any
helps are appreciated!
val conf = new
SparkConf().setMaster(spark://MacBook-Pro:7077).setAppName(My
App).setJars(...) val sc = new SparkContext(conf)// initialize individuals
(in small letters) and concepts (in upper case letters) val users:
RDD[(org.apache.spark.graphx.VertexId, String)] = sc.parallelize(Array((1L,
a), (2L, b), (3L, e), (11L, A), (12L, B), (13L, C), (14L,
D))) // initialize type edges val relationships: RDD[Edge[String]] =
sc.parallelize(Array( Edge(1L, 11L, type), Edge(1L, 14L, type),
Edge(1L, 13L, type), Edge(2L, 11L, type), Edge(2L, 12L, type),
Edge(3L, 11L, type), Edge(3L, 13L, type))) val graph = Graph(users,
relationships) val indTypes = graph.collectNeighborIds(EdgeDirection.Out)
// seems to be stupid functions... def mapUDF(triplet: EdgeContext[String,
String, HashMap[Long, Int]]) = { val msg = indTypes.filter(pred = pred._1
== triplet.srcId).first()._2.aggregate(new HashMap[Long, Int])((a, b) = a.+=(b
- 1), (a, b) = a ++ b) triplet.sendToDst(msg) } def reduceUDF(a:
HashMap[Long, Int], b: HashMap[Long, Int]): HashMap[Long, Int] = a ++ b.map {
case (k, v) = k - (v + a.getOrElse(k, 0)) } var pairs = new HashMap[(Long,
Long), Int] val results = graph.aggregateMessages[HashMap[Long, Int]](
mapUDF, reduceUDF)
results.foreach(result = { result._2.filter(p = p._1 !=
result._1).foreach(map = { val a = result._1 val b = map._1
if (!pairs.contains(a, b) !pairs.contains(b, a)) pairs += (a, b)
- map._2 }) }) pairs.foreach(println(_))
The exceptions:
TaskSetManager: Lost task 1.0 in stage 3.0 (TID 7, 10.42.0.17):
java.lang.NullPointerExceptionat
atur.node.learner.Test$.mapUDF(SimpleDisjointnessLearner.scala:147)at
atur.node.learner.Test$$anonfun$4.apply(SimpleDisjointnessLearner.scala:155)
best wishes,June