@Dmitriy Sir
I have completed the Kmeans code as per the algorithm you have Outline above
My code is as follows
This code works fine till step number 10
In step 11 i am assigning the new centriod index to corresponding row key
of data Point in the matrix
I think i am doing something wrong in step 11 may be i am using incorrect
syntax
Can you help me find out what am i doing wrong.
//start of main method
def main(args: Array[String]) {
//1. initialize the spark and mahout context
val conf = new SparkConf()
.setAppName("DRMExample")
.setMaster(args(0))
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator",
"org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator")
implicit val sc = new SparkDistributedContext(new SparkContext(conf))
//2. read the data file and save it in the rdd
val lines = sc.textFile(args(1))
//3. convert data read in as string in to array of double
val test = lines.map(line => line.split('\t').map(_.toDouble))
//4. add a column having value 1 in array of double this will
create something like (1 | D)', which will be used while calculating
(1 | D)'
val augumentedArray = test.map(addCentriodColumn _)
//5. convert rdd of array of double in rdd of DenseVector
val rdd = augumentedArray.map(dvec(_))
//6. convert rdd to DrmRdd
val rddMatrixLike: DrmRdd[Int] = rdd.zipWithIndex.map { case (v,
idx) => (idx.toInt, v) } //7. convert DrmRdd to
CheckpointedDrm[Int] val matrix = drmWrap(rddMatrixLike) //8.
seperating the column having all ones created in step 4 and will use
it later val oneVector = matrix(::, 0 until 1) //9. final
input data in DrmLike[Int] format val dataDrmX = matrix(::, 1 until
4) //9. Sampling to select initial centriods val
centriods = drmSampleKRows(dataDrmX, 2, false) centriods.size
//10. Broad Casting the initial centriods val broadCastMatrix =
drmBroadcast(centriods) //11. Iterating over the Data
Matrix(in DrmLike[Int] format) to calculate the initial centriods
dataDrmX.mapBlock() { case (keys, block) => for (row <- 0
until block.nrow) { var dataPoint = block(row, ::)
//12. findTheClosestCentriod find the closest centriod to the
Data point specified by "dataPoint" val closesetIndex =
findTheClosestCentriod(dataPoint, centriods) //13.
assigning closest index to key keys(row) = closesetIndex
} keys -> block }
//14. Calculating the (1|D) val b = (oneVector cbind
dataDrmX) //15. Aggregating Transpose (1|D)' val bTranspose
= (oneVector cbind dataDrmX).t // after step 15 bTranspose will
have data in the following format /*(n+1)*K where n=dimension
of the data point, K=number of clusters * zeroth row will contain
the count of points assigned to each cluster * assuming 3d data
points * */
val nrows = b.nrow.toInt //16. slicing the count vectors out
val pointCountVectors = drmBroadcast(b(0 until 1, ::).collect(0, ::))
val vectorSums = b(1 until nrows, ::) //17. dividing the data
point by count vector vectorSums.mapBlock() { case (keys,
block) => for (row <- 0 until block.nrow) { block(row,
::) /= pointCountVectors } keys -> block } //18.
seperating the count vectors val newCentriods = vectorSums.t(::,1
until centriods.size) //19. iterate over the above code
till convergence criteria is meet }//end of main method
// method to find the closest centriod to data point( vec: Vector
in the arguments) def findTheClosestCentriod(vec: Vector, matrix:
Matrix): Int = {
var index = 0
var closest = Double.PositiveInfinity
for (row <- 0 until matrix.nrow) {
val squaredSum = ssr(vec, matrix(row, ::))
val tempDist = Math.sqrt(ssr(vec, matrix(row, ::)))
if (tempDist < closest) {
closest = tempDist
index = row
}
}
index
}
//calculating the sum of squared distance between the points(Vectors)
def ssr(a: Vector, b: Vector): Double = {
(a - b) ^= 2 sum
}
//method used to create (1|D)
def addCentriodColumn(arg: Array[Double]): Array[Double] = {
val newArr = new Array[Double](arg.length + 1)
newArr(0) = 1.0;
for (i <- 0 until (arg.size)) {
newArr(i + 1) = arg(i);
}
newArr
}
Thanks & Regards
Parth Khatwani
On Mon, Apr 3, 2017 at 7:37 PM, KHATWANI PARTH BHARAT <
[email protected]> wrote:
>
> ---------- Forwarded message ----------
> From: Dmitriy Lyubimov <[email protected]>
> Date: Fri, Mar 31, 2017 at 11:34 PM
> Subject: Re: Trying to write the KMeans Clustering Using "Apache Mahout
> Samsara"
> To: "[email protected]" <[email protected]>
>
>
> ps1 this assumes row-wise construction of A based on training set of m
> n-dimensional points.
> ps2 since we are doing multiple passes over A it may make sense to make
> sure it is committed to spark cache (by using checkpoint api), if spark is
> used
>
> On Fri, Mar 31, 2017 at 10:53 AM, Dmitriy Lyubimov <[email protected]>
> wrote:
>
> > here is the outline. For details of APIs, please refer to samsara manual
> > [2], i will not be be repeating it.
> >
> > Assume your training data input is m x n matrix A. For simplicity let's
> > assume it's a DRM with int row keys, i.e., DrmLike[Int].
> >
> > Initialization:
> >
> > First, classic k-means starts by selecting initial clusters, by sampling
> > them out. You can do that by using sampling api [1], thus forming a k x n
> > in-memory matrix C (current centroids). C is therefore of Mahout's Matrix
> > type.
> >
> > You the proceed by alternating between cluster assignments and
> > recompupting centroid matrix C till convergence based on some test or
> > simply limited by epoch count budget, your choice.
> >
> > Cluster assignments: here, we go over current generation of A and
> > recompute centroid indexes for each row in A. Once we recompute index, we
> > put it into the row key . You can do that by assigning centroid indices
> to
> > keys of A using operator mapblock() (details in [2], [3], [4]). You also
> > need to broadcast C in order to be able to access it in efficient manner
> > inside mapblock() closure. Examples of that are plenty given in [2].
> > Essentially, in mapblock, you'd reform the row keys to reflect cluster
> > index in C. while going over A, you'd have a "nearest neighbor" problem
> to
> > solve for the row of A and centroids C. This is the bulk of computation
> > really, and there are a few tricks there that can speed this step up in
> > both exact and approximate manner, but you can start with a naive search.
> >
> > Centroid recomputation:
> > once you assigned centroids to the keys of marix A, you'd want to do an
> > aggregating transpose of A to compute essentially average of row A
> grouped
> > by the centroid key. The trick is to do a computation of (1|A)' which
> will
> > results in a matrix of the shape (Counts/sums of cluster rows). This is
> the
> > part i find difficult to explain without a latex graphics.
> >
> > In Samsara, construction of (1|A)' corresponds to DRM expression
> >
> > (1 cbind A).t (again, see [2]).
> >
> > So when you compute, say,
> >
> > B = (1 | A)',
> >
> > then B is (n+1) x k, so each column contains a vector corresponding to a
> > cluster 1..k. In such column, the first element would be # of points in
> the
> > cluster, and the rest of it would correspond to sum of all points. So in
> > order to arrive to an updated matrix C, we need to collect B into memory,
> > and slice out counters (first row) from the rest of it.
> >
> > So, to compute C:
> >
> > C <- B (2:,:) each row divided by B(1,:)
> >
> > (watch out for empty clusters with 0 elements, this will cause lack of
> > convergence and NaNs in the newly computed C).
> >
> > This operation obviously uses subblocking and row-wise iteration over B,
> > for which i am again making reference to [2].
> >
> >
> > [1] https://github.com/apache/mahout/blob/master/math-scala/
> > src/main/scala/org/apache/mahout/math/drm/package.scala#L149
> >
> > [2], Sasmara manual, a bit dated but viable, http://apache.github.
> > io/mahout/doc/ScalaSparkBindings.html
> >
> > [3] scaladoc, again, dated but largely viable for the purpose of this
> > exercise:
> > http://apache.github.io/mahout/0.10.1/docs/mahout-math-scala/index.htm
> >
> > [4] mapblock etc. http://apache.github.io/mahout/0.10.1/docs/mahout-
> > math-scala/index.html#org.apache.mahout.math.drm.RLikeDrmOps
> >
> > On Fri, Mar 31, 2017 at 9:54 AM, KHATWANI PARTH BHARAT <
> > [email protected]> wrote:
> >
> >> @Dmitriycan you please again tell me the approach to move ahead.
> >>
> >>
> >> Thanks
> >> Parth Khatwani
> >>
> >>
> >> On Fri, Mar 31, 2017 at 10:15 PM, KHATWANI PARTH BHARAT <
> >> [email protected]> wrote:
> >>
> >> > yes i am unable to figure out the way ahead.
> >> > Like how to create the augmented matrix A := (0|D) which you have
> >> > mentioned.
> >> >
> >> >
> >> > On Fri, Mar 31, 2017 at 10:10 PM, Dmitriy Lyubimov <[email protected]
> >
> >> > wrote:
> >> >
> >> >> was my reply for your post on @user has been a bit confusing?
> >> >>
> >> >> On Fri, Mar 31, 2017 at 8:40 AM, KHATWANI PARTH BHARAT <
> >> >> [email protected]> wrote:
> >> >>
> >> >> > Sir,
> >> >> > I am trying to write the kmeans clustering algorithm using Mahout
> >> >> Samsara
> >> >> > but i am bit confused
> >> >> > about how to leverage Distributed Row Matrix for the same. Can
> >> anybody
> >> >> help
> >> >> > me with same.
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> > Thanks
> >> >> > Parth Khatwani
> >> >> >
> >> >>
> >> >
> >> >
> >>
> >
> >
>
>