@Dmitriy Sir I have completed the Kmeans code as per the algorithm you have Outline above
My code is as follows This code works fine till step number 10 In step 11 i am assigning the new centriod index to corresponding row key of data Point in the matrix I think i am doing something wrong in step 11 may be i am using incorrect syntax Can you help me find out what am i doing wrong. //start of main method def main(args: Array[String]) { //1. initialize the spark and mahout context val conf = new SparkConf() .setAppName("DRMExample") .setMaster(args(0)) .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryo.registrator", "org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator") implicit val sc = new SparkDistributedContext(new SparkContext(conf)) //2. read the data file and save it in the rdd val lines = sc.textFile(args(1)) //3. convert data read in as string in to array of double val test = lines.map(line => line.split('\t').map(_.toDouble)) //4. add a column having value 1 in array of double this will create something like (1 | D)', which will be used while calculating (1 | D)' val augumentedArray = test.map(addCentriodColumn _) //5. convert rdd of array of double in rdd of DenseVector val rdd = augumentedArray.map(dvec(_)) //6. convert rdd to DrmRdd val rddMatrixLike: DrmRdd[Int] = rdd.zipWithIndex.map { case (v, idx) => (idx.toInt, v) } //7. convert DrmRdd to CheckpointedDrm[Int] val matrix = drmWrap(rddMatrixLike) //8. seperating the column having all ones created in step 4 and will use it later val oneVector = matrix(::, 0 until 1) //9. final input data in DrmLike[Int] format val dataDrmX = matrix(::, 1 until 4) //9. Sampling to select initial centriods val centriods = drmSampleKRows(dataDrmX, 2, false) centriods.size //10. Broad Casting the initial centriods val broadCastMatrix = drmBroadcast(centriods) //11. Iterating over the Data Matrix(in DrmLike[Int] format) to calculate the initial centriods dataDrmX.mapBlock() { case (keys, block) => for (row <- 0 until block.nrow) { var dataPoint = block(row, ::) //12. findTheClosestCentriod find the closest centriod to the Data point specified by "dataPoint" val closesetIndex = findTheClosestCentriod(dataPoint, centriods) //13. assigning closest index to key keys(row) = closesetIndex } keys -> block } //14. Calculating the (1|D) val b = (oneVector cbind dataDrmX) //15. Aggregating Transpose (1|D)' val bTranspose = (oneVector cbind dataDrmX).t // after step 15 bTranspose will have data in the following format /*(n+1)*K where n=dimension of the data point, K=number of clusters * zeroth row will contain the count of points assigned to each cluster * assuming 3d data points * */ val nrows = b.nrow.toInt //16. slicing the count vectors out val pointCountVectors = drmBroadcast(b(0 until 1, ::).collect(0, ::)) val vectorSums = b(1 until nrows, ::) //17. dividing the data point by count vector vectorSums.mapBlock() { case (keys, block) => for (row <- 0 until block.nrow) { block(row, ::) /= pointCountVectors } keys -> block } //18. seperating the count vectors val newCentriods = vectorSums.t(::,1 until centriods.size) //19. iterate over the above code till convergence criteria is meet }//end of main method // method to find the closest centriod to data point( vec: Vector in the arguments) def findTheClosestCentriod(vec: Vector, matrix: Matrix): Int = { var index = 0 var closest = Double.PositiveInfinity for (row <- 0 until matrix.nrow) { val squaredSum = ssr(vec, matrix(row, ::)) val tempDist = Math.sqrt(ssr(vec, matrix(row, ::))) if (tempDist < closest) { closest = tempDist index = row } } index } //calculating the sum of squared distance between the points(Vectors) def ssr(a: Vector, b: Vector): Double = { (a - b) ^= 2 sum } //method used to create (1|D) def addCentriodColumn(arg: Array[Double]): Array[Double] = { val newArr = new Array[Double](arg.length + 1) newArr(0) = 1.0; for (i <- 0 until (arg.size)) { newArr(i + 1) = arg(i); } newArr } Thanks & Regards Parth Khatwani On Mon, Apr 3, 2017 at 7:37 PM, KHATWANI PARTH BHARAT < h2016...@pilani.bits-pilani.ac.in> wrote: > > ---------- Forwarded message ---------- > From: Dmitriy Lyubimov <dlie...@gmail.com> > Date: Fri, Mar 31, 2017 at 11:34 PM > Subject: Re: Trying to write the KMeans Clustering Using "Apache Mahout > Samsara" > To: "dev@mahout.apache.org" <dev@mahout.apache.org> > > > ps1 this assumes row-wise construction of A based on training set of m > n-dimensional points. > ps2 since we are doing multiple passes over A it may make sense to make > sure it is committed to spark cache (by using checkpoint api), if spark is > used > > On Fri, Mar 31, 2017 at 10:53 AM, Dmitriy Lyubimov <dlie...@gmail.com> > wrote: > > > here is the outline. For details of APIs, please refer to samsara manual > > [2], i will not be be repeating it. > > > > Assume your training data input is m x n matrix A. For simplicity let's > > assume it's a DRM with int row keys, i.e., DrmLike[Int]. > > > > Initialization: > > > > First, classic k-means starts by selecting initial clusters, by sampling > > them out. You can do that by using sampling api [1], thus forming a k x n > > in-memory matrix C (current centroids). C is therefore of Mahout's Matrix > > type. > > > > You the proceed by alternating between cluster assignments and > > recompupting centroid matrix C till convergence based on some test or > > simply limited by epoch count budget, your choice. > > > > Cluster assignments: here, we go over current generation of A and > > recompute centroid indexes for each row in A. Once we recompute index, we > > put it into the row key . You can do that by assigning centroid indices > to > > keys of A using operator mapblock() (details in [2], [3], [4]). You also > > need to broadcast C in order to be able to access it in efficient manner > > inside mapblock() closure. Examples of that are plenty given in [2]. > > Essentially, in mapblock, you'd reform the row keys to reflect cluster > > index in C. while going over A, you'd have a "nearest neighbor" problem > to > > solve for the row of A and centroids C. This is the bulk of computation > > really, and there are a few tricks there that can speed this step up in > > both exact and approximate manner, but you can start with a naive search. > > > > Centroid recomputation: > > once you assigned centroids to the keys of marix A, you'd want to do an > > aggregating transpose of A to compute essentially average of row A > grouped > > by the centroid key. The trick is to do a computation of (1|A)' which > will > > results in a matrix of the shape (Counts/sums of cluster rows). This is > the > > part i find difficult to explain without a latex graphics. > > > > In Samsara, construction of (1|A)' corresponds to DRM expression > > > > (1 cbind A).t (again, see [2]). > > > > So when you compute, say, > > > > B = (1 | A)', > > > > then B is (n+1) x k, so each column contains a vector corresponding to a > > cluster 1..k. In such column, the first element would be # of points in > the > > cluster, and the rest of it would correspond to sum of all points. So in > > order to arrive to an updated matrix C, we need to collect B into memory, > > and slice out counters (first row) from the rest of it. > > > > So, to compute C: > > > > C <- B (2:,:) each row divided by B(1,:) > > > > (watch out for empty clusters with 0 elements, this will cause lack of > > convergence and NaNs in the newly computed C). > > > > This operation obviously uses subblocking and row-wise iteration over B, > > for which i am again making reference to [2]. > > > > > > [1] https://github.com/apache/mahout/blob/master/math-scala/ > > src/main/scala/org/apache/mahout/math/drm/package.scala#L149 > > > > [2], Sasmara manual, a bit dated but viable, http://apache.github. > > io/mahout/doc/ScalaSparkBindings.html > > > > [3] scaladoc, again, dated but largely viable for the purpose of this > > exercise: > > http://apache.github.io/mahout/0.10.1/docs/mahout-math-scala/index.htm > > > > [4] mapblock etc. http://apache.github.io/mahout/0.10.1/docs/mahout- > > math-scala/index.html#org.apache.mahout.math.drm.RLikeDrmOps > > > > On Fri, Mar 31, 2017 at 9:54 AM, KHATWANI PARTH BHARAT < > > h2016...@pilani.bits-pilani.ac.in> wrote: > > > >> @Dmitriycan you please again tell me the approach to move ahead. > >> > >> > >> Thanks > >> Parth Khatwani > >> > >> > >> On Fri, Mar 31, 2017 at 10:15 PM, KHATWANI PARTH BHARAT < > >> h2016...@pilani.bits-pilani.ac.in> wrote: > >> > >> > yes i am unable to figure out the way ahead. > >> > Like how to create the augmented matrix A := (0|D) which you have > >> > mentioned. > >> > > >> > > >> > On Fri, Mar 31, 2017 at 10:10 PM, Dmitriy Lyubimov <dlie...@gmail.com > > > >> > wrote: > >> > > >> >> was my reply for your post on @user has been a bit confusing? > >> >> > >> >> On Fri, Mar 31, 2017 at 8:40 AM, KHATWANI PARTH BHARAT < > >> >> h2016...@pilani.bits-pilani.ac.in> wrote: > >> >> > >> >> > Sir, > >> >> > I am trying to write the kmeans clustering algorithm using Mahout > >> >> Samsara > >> >> > but i am bit confused > >> >> > about how to leverage Distributed Row Matrix for the same. Can > >> anybody > >> >> help > >> >> > me with same. > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > Thanks > >> >> > Parth Khatwani > >> >> > > >> >> > >> > > >> > > >> > > > > > >