That was a bit of a brute force search, so I changed the code to use a UDF to create the dot product between the two IDF vectors, and do a sort on the new column.
package com.ss.ml.clustering import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions._ import org.apache.spark.ml.feature.{IDF, Tokenizer, HashingTF} import org.apache.spark.ml.linalg.Vector object ClusteringBasics extends App { val spark = SparkSession.builder().appName("Clustering Basics").master("local").getOrCreate() import spark.implicits._ val df = spark.read.option("header", "false").csv("data") val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words") val tf = new HashingTF().setInputCol("words").setOutputCol("tf") val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf") val df1 = tf.transform(tk.transform(df)) val idfs = idf.fit(df1).transform(df1) val nn = nearestNeighbour("<http://dbpedia.org/resource/Barack_Obama>", idfs) println(nn) def nearestNeighbour(uri: String, ds: DataFrame) : String = { val tfIdfSrc = ds.filter(s"_c0 == '$uri'").take(1)(0).getAs[Vector]("tf-idf") def dorProduct(vectorA: Vector) = { var dp = 0.0 var index = vectorA.size - 1 for (i <- 0 to index) { dp += vectorA(i) * tfIdfSrc(i) } dp } val dpUdf = udf((v1: Vector, v2: Vector) => dorProduct(v1)) ds.filter(s"_c0 != '$uri'").withColumn("dp", dpUdf('tf-idf)).sort("dp").take(1)(0).getString(1) } } However, that is generating the exception below, Exception in thread "main" java.lang.RuntimeException: Unsupported literal type class org.apache.spark.ml.feature.IDF idf_e49381a285dd at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:57) at org.apache.spark.sql.functions$.lit(functions.scala:101) at org.apache.spark.sql.Column.$minus(Column.scala:672) at com.ss.ml.clustering.ClusteringBasics$.nearestNeighbour(ClusteringBasics.scala:36) at com.ss.ml.clustering.ClusteringBasics$.delayedEndpoint$com$ss$ml$clustering$ClusteringBasics$1(ClusteringBasics.scala:22) at com.ss.ml.clustering.ClusteringBasics$delayedInit$body.apply(ClusteringBasics.scala:8) at scala.Function0$class.apply$mcV$sp(Function0.scala:34) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) at scala.App$class.main(App.scala:76) at com.ss.ml.clustering.ClusteringBasics$.main(ClusteringBasics.scala:8) at com.ss.ml.clustering.ClusteringBasics.main(ClusteringBasics.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) On Sun, Nov 13, 2016 at 10:56 PM, Meeraj Kunnumpurath < mee...@servicesymphony.com> wrote: > This is what I have done, is there a better way of doing this? > > val df = spark.read.option("header", "false").csv("data") > > > val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words") > > val tf = new HashingTF().setInputCol("words").setOutputCol("tf") > > val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf") > > > val df1 = tf.transform(tk.transform(df)) > > val idfs = idf.fit(df1).transform(df1) > > > println(nearestNeighbour("http://dbpedia.org/resource/Barack_Obama", > idfs)) > > > def nearestNeighbour(uri: String, ds: DataFrame) : String = { > > var res : Row = null > > var metric : Double = 0 > > val tfIdfSrc = ds.filter(s"_c0 == '$uri'").take(1)(0).getAs[ > Vector]("tf-idf") > > ds.filter("_c0 != '" + uri + "'").foreach { r => > > val tfIdfDst = r.getAs[Vector]("tf-idf") > > val dp = dorProduct(tfIdfSrc, tfIdfDst) > > if (dp > metric) { > > res = r > > metric = dp > > } > > } > > return res.getAs[String]("_c1") > > } > > > def cosineSimilarity(vectorA: Vector, vectorB: Vector) = { > > var dotProduct = 0.0 > > var normA = 0.0 > > var normB = 0.0 > > var index = vectorA.size - 1 > > for (i <- 0 to index) { > > dotProduct += vectorA(i) * vectorB(i) > > normA += Math.pow(vectorA(i), 2) > > normB += Math.pow(vectorB(i), 2) > > } > > (dotProduct / (Math.sqrt(normA) * Math.sqrt(normB))) > > } > > > def dorProduct(vectorA: Vector, vectorB: Vector) = { > > var dp = 0.0 > > var index = vectorA.size - 1 > > for (i <- 0 to index) { > > dp += vectorA(i) * vectorB(i) > > } > > dp > > } > > On Sun, Nov 13, 2016 at 7:04 PM, Meeraj Kunnumpurath < > mee...@servicesymphony.com> wrote: > >> Hello, >> >> I have a dataset containing TF-IDF vectors for a corpus of documents. How >> do I perform a nearest neighbour search on the dataset, using cosine >> similarity? >> >> val df = spark.read.option("header", "false").csv("data") >> >> val tk = new Tokenizer().setInputCol("_c2").setOutputCol("words") >> >> val tf = new HashingTF().setInputCol("words").setOutputCol("tf") >> >> val idf = new IDF().setInputCol("tf").setOutputCol("tf-idf") >> >> val df1 = tf.transform(tk.transform(df)) >> >> idf.fit(df1).transform(df1).select("tf-idf").show(10) >> Thank you >> >> -- >> *Meeraj Kunnumpurath* >> >> >> *Director and Executive PrincipalService Symphony Ltd00 44 7702 693597* >> >> *00 971 50 409 0169mee...@servicesymphony.com >> <mee...@servicesymphony.com>* >> > > > > -- > *Meeraj Kunnumpurath* > > > *Director and Executive PrincipalService Symphony Ltd00 44 7702 693597* > > *00 971 50 409 0169mee...@servicesymphony.com <mee...@servicesymphony.com>* > -- *Meeraj Kunnumpurath* *Director and Executive PrincipalService Symphony Ltd00 44 7702 693597* *00 971 50 409 0169mee...@servicesymphony.com <mee...@servicesymphony.com>*