looking for helps in using graphx aggregateMessages

2015-07-31 Thread man june
Dear list,
Hi~I am new to spark and graphx, and I have a few experiences using scala. I 
want to use graphx to calculate some basic statistics in linked open data, 
which is basically a graph. 
Suppose the graph only contains one type of edge, directing from individuals to 
concepts, and the edge labels are all type. I want to find all pairs of 
concepts that have at least one individual linking to both of them.The 
following is my current solution, but sadly doesn't work.
Could you please help me work this out? Or are there better solutions? Any 
helps are appreciated! 

  val conf = new 
SparkConf().setMaster(spark://MacBook-Pro:7077).setAppName(My 
App).setJars(...)  val sc = new SparkContext(conf)// initialize individuals 
(in small letters) and concepts (in upper case letters)  val users: 
RDD[(org.apache.spark.graphx.VertexId, String)] =    sc.parallelize(Array((1L, 
a), (2L, b),      (3L, e), (11L, A), (12L, B), (13L, C), (14L, 
D)))  // initialize type edges  val relationships: RDD[Edge[String]] =    
sc.parallelize(Array(      Edge(1L, 11L, type), Edge(1L, 14L, type), 
Edge(1L, 13L, type),      Edge(2L, 11L, type), Edge(2L, 12L, type),      
Edge(3L, 11L, type), Edge(3L, 13L, type)))   val graph = Graph(users, 
relationships)  val indTypes = graph.collectNeighborIds(EdgeDirection.Out)
// seems to be stupid functions...  def mapUDF(triplet: EdgeContext[String, 
String, HashMap[Long, Int]]) = {    val msg = indTypes.filter(pred = pred._1 
== triplet.srcId).first()._2.aggregate(new HashMap[Long, Int])((a, b) = a.+=(b 
- 1), (a, b) = a ++ b)    triplet.sendToDst(msg)  }  def reduceUDF(a: 
HashMap[Long, Int], b: HashMap[Long, Int]): HashMap[Long, Int] = a ++ b.map { 
case (k, v) = k - (v + a.getOrElse(k, 0)) }    var pairs = new HashMap[(Long, 
Long), Int]  val results = graph.aggregateMessages[HashMap[Long, Int]](    
mapUDF, reduceUDF)
  results.foreach(result = {      result._2.filter(p = p._1 != 
result._1).foreach(map = {        val a = result._1        val b = map._1      
  if (!pairs.contains(a, b)  !pairs.contains(b, a))          pairs += (a, b) 
- map._2      })    })  pairs.foreach(println(_))
The exceptions:
TaskSetManager: Lost task 1.0 in stage 3.0 (TID 7, 10.42.0.17): 
java.lang.NullPointerExceptionat 
atur.node.learner.Test$.mapUDF(SimpleDisjointnessLearner.scala:147)at 
atur.node.learner.Test$$anonfun$4.apply(SimpleDisjointnessLearner.scala:155)

best wishes,June

question about customize kmeans distance measure

2015-05-12 Thread June
Dear list,

 

I am new to spark, and I want to use the kmeans algorithm in mllib package. 

I am wondering whether it is possible to customize the distance measure used
by kmeans, and how?

 

Many thanks!

 

June