Carter, Just as a quick & simple starting point for Spark. (caveats - lots of improvements reqd for scaling, graceful and efficient handling of RDD et al):
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import scala.collection.immutable.ListMap import scala.collection.immutable.SortedMap object TopK { // def getCurrentDirectory = new java.io.File( "." ).getCanonicalPath // def distance(x1:List[Int],x2:List[Int]):Double = { val dist:Double = math.sqrt(math.pow(x1(1)-x2(1),2) + math.pow(x1(2)-x2( 2),2)) dist } // def main(args: Array[String]): Unit = { // println(getCurrentDirectory) val sc = new SparkContext("local","TopK", "spark://USS-Defiant.local:7077") println(s"Running Spark Version ${sc.version}") val file = sc.textFile("data01.csv") // val data = file .map(line => line.split(",")) .map(x1 => List(x1(0).toInt,x1(1).toInt,x1(2).toInt)) //val data1 = data.collect println("data") for (d <- data) { println(d) println(d(0)) } // val distList = for (d <- data) yield {d(0)} //for (d <- distList) (println(d)) val zipList = for (a <- distList.collect; b <- distList.collect) yield{ List( a,b)} zipList.foreach(println(_)) // val dist = for (l <- zipList) yield { println(s"${l(0)} = ${l(1)}") val x1a:Array[List[Int]] = data.filter(d => d(0) == l(0)).collect val x2a:Array[List[Int]] = data.filter(d => d(0) == l(1)).collect val x1:List[Int] = x1a(0) val x2:List[Int] = x2a(0) val dist = distance(x1,x2) Map ( dist -> l ) } dist.foreach(println(_)) // sort this for topK // } } data01.csv 1,68,93 2,12,90 3,45,76 4,86,54 HTH. Cheers <k/> On Tue, May 27, 2014 at 4:10 AM, Carter <gyz...@hotmail.com> wrote: > Any suggestion is very much appreciated. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/K-nearest-neighbors-search-in-Spark-tp6393p6421.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. >