@Dmitriy,
@Trevor and @Andrew
I have tried
Testing this Row Key assignment issue which i have mentioned in the above
mail,
By Writing the a separate code where i am assigning the a default value 1
to each row Key of The DRM and then taking the aggregating transpose
I have committed the separate test code to the Github Branch
<https://github.com/parth2691/Spark_Mahout/tree/Dmitriy-Lyubimov>.
The Code is as follows
val inCoreA = dense((1,1, 2, 3), (1,2, 3, 4), (1,3, 4, 5), (1,4, 5, 6))
val A = drmParallelize(m = inCoreA)
//Mapblock
val drm2 = A.mapBlock() {
case (keys, block) => for(row <- 0 until keys.size) {
* //assigning 1 to each row index* keys(row) = 1
} (keys, block) } prinln("After New Cluster
assignment") println(""+drm2.collect) val aggTranspose = drm2.t
println("Result of aggregating tranpose")
println(""+aggTranspose.collect)
Out of 1st println After New Cluster assignment should be
This
{
0 => {0:1.0, 1: 1.0, 2: 1.0, 3: 3.0}
1 => {0:1.0, 1: 2.0, 2: 3.0, 3: 4.0}
2 => {0:1.0, 1: 3.0, 2: 4.0, 3: 5.0}
3 => {0:1.0, 1: 4.0, 2: 5.0, 3: 6.0}
}
(Here zeroth Column is used to store the centriod count and column 1,2 and
3 Contains Data)
But Turns out to be this
{
0 => {}
1 => {0:1.0,1:4.0,2:5.0,3:6.0}
2 => {}
3 => {}
}
And the result of aggregating Transpose should be
{
0 => {1: 4.0}
1 => {1: 9.0}
2 => {1: 12.0}
3 => {1: 15.0}
}
I have referred to the book written by Andrew And Dmitriy Apache Mahout:
Beyond MapReduce
<https://www.amazon.com/Apache-Mahout-MapReduce-Dmitriy-Lyubimov/dp/1523775785>
Aggregating
Transpose and other concepts are explained very nicely over here but i am
unable to find any example where
Row Keys are assigned new Values . Mahout Samsara Manual
http://apache.github.io/mahout/doc/ScalaSparkBindings.html Also Does not
contain any such examples.
It will great if i can get some reference to solution of mentioned issue.
Thanks
Parth Khatwani
On Sat, Apr 15, 2017 at 12:13 AM, Andrew Palumbo <[email protected]> wrote:
> +1
>
>
>
> Sent from my Verizon Wireless 4G LTE smartphone
>
>
> -------- Original message --------
> From: Trevor Grant <[email protected]>
> Date: 04/14/2017 11:40 (GMT-08:00)
> To: [email protected]
> Subject: Re: Trying to write the KMeans Clustering Using "Apache Mahout
> Samsara"
>
> Parth and Dmitriy,
>
> This is awesome- as a follow on can we work on getting this rolled in to
> the algorithms framework?
>
> Happy to work with you on this Parth!
>
> Trevor Grant
> Data Scientist
> https://github.com/rawkintrevo
> http://stackexchange.com/users/3002022/rawkintrevo
> http://trevorgrant.org
>
> *"Fortunate is he, who is able to know the causes of things." -Virgil*
>
>
> On Fri, Apr 14, 2017 at 1:27 PM, Dmitriy Lyubimov <[email protected]>
> wrote:
>
> > i would think reassinging keys should work in most cases.
> > The only exception is that technically Spark contracts imply that effect
> > should be idempotent if task is retried, which might be a problem in a
> > specific scenario of the object tree coming out from block cache object
> > tree, which can stay there and be retried again. but specifically w.r.t.
> > this key assignment i don't see any problem since the action obviously
> > would be idempotent even if this code is run multiple times on the same
> > (key, block) pair. This part should be good IMO.
> >
> > On Fri, Apr 14, 2017 at 2:26 AM, KHATWANI PARTH BHARAT <
> > [email protected]> wrote:
> >
> > > @Dmitriy Sir,
> > > In the K means code above I think i am doing the following Incorrectly
> > >
> > > Assigning the closest centriod index to the Row Keys of DRM
> > >
> > > //11. Iterating over the Data Matrix(in DrmLike[Int] format) to
> calculate
> > > the initial centriods
> > > dataDrmX.mapBlock() {
> > > case (keys, block) =>
> > > for (row <- 0 until block.nrow) {
> > > var dataPoint = block(row, ::)
> > >
> > > //12. findTheClosestCentriod find the closest centriod to the
> > > Data point specified by "dataPoint"
> > > val closesetIndex = findTheClosestCentriod(dataPoint,
> > centriods)
> > >
> > > //13. assigning closest index to key
> > > keys(row) = closesetIndex
> > > }
> > > keys -> block
> > > }
> > >
> > > in step 12 i am finding the centriod closest to the current dataPoint
> > > in step13 i am assigning the closesetIndex to the key of the
> > corresponding
> > > row represented by the dataPoint
> > > I think i am doing step13 incorrectly.
> > >
> > > Also i am unable to find the proper reference for the same in the
> > reference
> > > links which you have mentioned above
> > >
> > >
> > > Thanks & Regards
> > > Parth Khatwani
> > >
> > >
> > >
> > >
> > >
> > > On Thu, Apr 13, 2017 at 6:24 PM, KHATWANI PARTH BHARAT <
> > > [email protected]> wrote:
> > >
> > > > Dmitriy Sir,
> > > > I have Created a github branch Github Branch Having Initial Kmeans
> Code
> > > > <https://github.com/parth2691/Spark_Mahout/tree/Dmitriy-Lyubimov>
> > > >
> > > >
> > > > Thanks & Regards
> > > > Parth Khatwani
> > > >
> > > > On Thu, Apr 13, 2017 at 3:19 AM, Andrew Palumbo <[email protected]>
> > > > wrote:
> > > >
> > > >> +1 to creating a branch.
> > > >>
> > > >>
> > > >>
> > > >> Sent from my Verizon Wireless 4G LTE smartphone
> > > >>
> > > >>
> > > >> -------- Original message --------
> > > >> From: Dmitriy Lyubimov <[email protected]>
> > > >> Date: 04/12/2017 11:25 (GMT-08:00)
> > > >> To: [email protected]
> > > >> Subject: Re: Trying to write the KMeans Clustering Using "Apache
> > Mahout
> > > >> Samsara"
> > > >>
> > > >> can't say i can read this code well formatted that way...
> > > >>
> > > >> it would seem to me that the code is not using the broadcast
> variable
> > > and
> > > >> instead is using closure variable. that's the only thing i can
> > > immediately
> > > >> see by looking in the middle of it.
> > > >>
> > > >> it would be better if you created a branch on github for that code
> > that
> > > >> would allow for easy check-outs and comments.
> > > >>
> > > >> -d
> > > >>
> > > >> On Wed, Apr 12, 2017 at 10:29 AM, KHATWANI PARTH BHARAT <
> > > >> [email protected]> wrote:
> > > >>
> > > >> > @Dmitriy Sir
> > > >> >
> > > >> > I have completed the Kmeans code as per the algorithm you have
> > Outline
> > > >> > above
> > > >> >
> > > >> > My code is as follows
> > > >> >
> > > >> > This code works fine till step number 10
> > > >> >
> > > >> > In step 11 i am assigning the new centriod index to corresponding
> > row
> > > >> key
> > > >> > of data Point in the matrix
> > > >> > I think i am doing something wrong in step 11 may be i am using
> > > >> incorrect
> > > >> > syntax
> > > >> >
> > > >> > Can you help me find out what am i doing wrong.
> > > >> >
> > > >> >
> > > >> > //start of main method
> > > >> >
> > > >> > def main(args: Array[String]) {
> > > >> > //1. initialize the spark and mahout context
> > > >> > val conf = new SparkConf()
> > > >> > .setAppName("DRMExample")
> > > >> > .setMaster(args(0))
> > > >> > .set("spark.serializer", "org.apache.spark.serializer.
> > > >> > KryoSerializer")
> > > >> > .set("spark.kryo.registrator",
> > > >> > "org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator")
> > > >> > implicit val sc = new SparkDistributedContext(new
> > > >> SparkContext(conf))
> > > >> >
> > > >> > //2. read the data file and save it in the rdd
> > > >> > val lines = sc.textFile(args(1))
> > > >> >
> > > >> > //3. convert data read in as string in to array of double
> > > >> > val test = lines.map(line => line.split('\t').map(_.
> toDouble))
> > > >> >
> > > >> > //4. add a column having value 1 in array of double this will
> > > >> > create something like (1 | D)', which will be used while
> > calculating
> > > >> > (1 | D)'
> > > >> > val augumentedArray = test.map(addCentriodColumn _)
> > > >> >
> > > >> > //5. convert rdd of array of double in rdd of DenseVector
> > > >> > val rdd = augumentedArray.map(dvec(_))
> > > >> >
> > > >> > //6. convert rdd to DrmRdd
> > > >> > val rddMatrixLike: DrmRdd[Int] = rdd.zipWithIndex.map { case
> (v,
> > > >> > idx) => (idx.toInt, v) } //7. convert DrmRdd to
> > > >> > CheckpointedDrm[Int] val matrix = drmWrap(rddMatrixLike)
> //8.
> > > >> > seperating the column having all ones created in step 4 and will
> use
> > > >> > it later val oneVector = matrix(::, 0 until 1) //9.
> final
> > > >> > input data in DrmLike[Int] format val dataDrmX = matrix(::, 1
> > until
> > > >> > 4) //9. Sampling to select initial centriods val
> > > >> > centriods = drmSampleKRows(dataDrmX, 2, false) centriods.size
> > > >> > //10. Broad Casting the initial centriods val broadCastMatrix =
> > > >> > drmBroadcast(centriods) //11. Iterating over the Data
> > > >> > Matrix(in DrmLike[Int] format) to calculate the initial centriods
> > > >> > dataDrmX.mapBlock() { case (keys, block) => for (row
> <-
> > 0
> > > >> > until block.nrow) { var dataPoint = block(row, ::)
> > > >> > //12. findTheClosestCentriod find the closest centriod to
> > the
> > > >> > Data point specified by "dataPoint" val closesetIndex =
> > > >> > findTheClosestCentriod(dataPoint, centriods)
> > //13.
> > > >> > assigning closest index to key keys(row) = closesetIndex
> > > >> > } keys -> block }
> > > >> >
> > > >> > //14. Calculating the (1|D) val b = (oneVector cbind
> > > >> > dataDrmX) //15. Aggregating Transpose (1|D)' val
> > bTranspose
> > > >> > = (oneVector cbind dataDrmX).t // after step 15 bTranspose will
> > > >> > have data in the following format /*(n+1)*K where
> n=dimension
> > > >> > of the data point, K=number of clusters * zeroth row will
> contain
> > > >> > the count of points assigned to each cluster * assuming 3d data
> > > >> > points * */
> > > >> >
> > > >> >
> > > >> > val nrows = b.nrow.toInt //16. slicing the count vectors
> out
> > > >> > val pointCountVectors = drmBroadcast(b(0 until 1, ::).collect(0,
> > ::))
> > > >> > val vectorSums = b(1 until nrows, ::) //17. dividing the
> data
> > > >> > point by count vector vectorSums.mapBlock() { case (keys,
> > > >> > block) => for (row <- 0 until block.nrow) {
> > block(row,
> > > >> > ::) /= pointCountVectors } keys -> block }
> //18.
> > > >> > seperating the count vectors val newCentriods =
> vectorSums.t(::,1
> > > >> > until centriods.size) //19. iterate over the above code
> > > >> > till convergence criteria is meet }//end of main method
> > > >> >
> > > >> >
> > > >> >
> > > >> > // method to find the closest centriod to data point( vec:
> Vector
> > > >> > in the arguments) def findTheClosestCentriod(vec: Vector, matrix:
> > > >> > Matrix): Int = {
> > > >> > var index = 0
> > > >> > var closest = Double.PositiveInfinity
> > > >> > for (row <- 0 until matrix.nrow) {
> > > >> > val squaredSum = ssr(vec, matrix(row, ::))
> > > >> > val tempDist = Math.sqrt(ssr(vec, matrix(row, ::)))
> > > >> > if (tempDist < closest) {
> > > >> > closest = tempDist
> > > >> > index = row
> > > >> > }
> > > >> > }
> > > >> > index
> > > >> > }
> > > >> >
> > > >> > //calculating the sum of squared distance between the
> > > points(Vectors)
> > > >> > def ssr(a: Vector, b: Vector): Double = {
> > > >> > (a - b) ^= 2 sum
> > > >> > }
> > > >> >
> > > >> > //method used to create (1|D)
> > > >> > def addCentriodColumn(arg: Array[Double]): Array[Double] = {
> > > >> > val newArr = new Array[Double](arg.length + 1)
> > > >> > newArr(0) = 1.0;
> > > >> > for (i <- 0 until (arg.size)) {
> > > >> > newArr(i + 1) = arg(i);
> > > >> > }
> > > >> > newArr
> > > >> > }
> > > >> >
> > > >> >
> > > >> > Thanks & Regards
> > > >> > Parth Khatwani
> > > >> >
> > > >> >
> > > >> >
> > > >> > On Mon, Apr 3, 2017 at 7:37 PM, KHATWANI PARTH BHARAT <
> > > >> > [email protected]> wrote:
> > > >> >
> > > >> > >
> > > >> > > ---------- Forwarded message ----------
> > > >> > > From: Dmitriy Lyubimov <[email protected]>
> > > >> > > Date: Fri, Mar 31, 2017 at 11:34 PM
> > > >> > > Subject: Re: Trying to write the KMeans Clustering Using "Apache
> > > >> Mahout
> > > >> > > Samsara"
> > > >> > > To: "[email protected]" <[email protected]>
> > > >> > >
> > > >> > >
> > > >> > > ps1 this assumes row-wise construction of A based on training
> set
> > > of m
> > > >> > > n-dimensional points.
> > > >> > > ps2 since we are doing multiple passes over A it may make sense
> to
> > > >> make
> > > >> > > sure it is committed to spark cache (by using checkpoint api),
> if
> > > >> spark
> > > >> > is
> > > >> > > used
> > > >> > >
> > > >> > > On Fri, Mar 31, 2017 at 10:53 AM, Dmitriy Lyubimov <
> > > [email protected]
> > > >> >
> > > >> > > wrote:
> > > >> > >
> > > >> > > > here is the outline. For details of APIs, please refer to
> > samsara
> > > >> > manual
> > > >> > > > [2], i will not be be repeating it.
> > > >> > > >
> > > >> > > > Assume your training data input is m x n matrix A. For
> > simplicity
> > > >> let's
> > > >> > > > assume it's a DRM with int row keys, i.e., DrmLike[Int].
> > > >> > > >
> > > >> > > > Initialization:
> > > >> > > >
> > > >> > > > First, classic k-means starts by selecting initial clusters,
> by
> > > >> > sampling
> > > >> > > > them out. You can do that by using sampling api [1], thus
> > forming
> > > a
> > > >> k
> > > >> > x n
> > > >> > > > in-memory matrix C (current centroids). C is therefore of
> > Mahout's
> > > >> > Matrix
> > > >> > > > type.
> > > >> > > >
> > > >> > > > You the proceed by alternating between cluster assignments and
> > > >> > > > recompupting centroid matrix C till convergence based on some
> > test
> > > >> or
> > > >> > > > simply limited by epoch count budget, your choice.
> > > >> > > >
> > > >> > > > Cluster assignments: here, we go over current generation of A
> > and
> > > >> > > > recompute centroid indexes for each row in A. Once we
> recompute
> > > >> index,
> > > >> > we
> > > >> > > > put it into the row key . You can do that by assigning
> centroid
> > > >> indices
> > > >> > > to
> > > >> > > > keys of A using operator mapblock() (details in [2], [3],
> [4]).
> > > You
> > > >> > also
> > > >> > > > need to broadcast C in order to be able to access it in
> > efficient
> > > >> > manner
> > > >> > > > inside mapblock() closure. Examples of that are plenty given
> in
> > > [2].
> > > >> > > > Essentially, in mapblock, you'd reform the row keys to reflect
> > > >> cluster
> > > >> > > > index in C. while going over A, you'd have a "nearest
> neighbor"
> > > >> problem
> > > >> > > to
> > > >> > > > solve for the row of A and centroids C. This is the bulk of
> > > >> computation
> > > >> > > > really, and there are a few tricks there that can speed this
> > step
> > > >> up in
> > > >> > > > both exact and approximate manner, but you can start with a
> > naive
> > > >> > search.
> > > >> > > >
> > > >> > > > Centroid recomputation:
> > > >> > > > once you assigned centroids to the keys of marix A, you'd want
> > to
> > > >> do an
> > > >> > > > aggregating transpose of A to compute essentially average of
> > row A
> > > >> > > grouped
> > > >> > > > by the centroid key. The trick is to do a computation of
> (1|A)'
> > > >> which
> > > >> > > will
> > > >> > > > results in a matrix of the shape (Counts/sums of cluster
> rows).
> > > >> This is
> > > >> > > the
> > > >> > > > part i find difficult to explain without a latex graphics.
> > > >> > > >
> > > >> > > > In Samsara, construction of (1|A)' corresponds to DRM
> expression
> > > >> > > >
> > > >> > > > (1 cbind A).t (again, see [2]).
> > > >> > > >
> > > >> > > > So when you compute, say,
> > > >> > > >
> > > >> > > > B = (1 | A)',
> > > >> > > >
> > > >> > > > then B is (n+1) x k, so each column contains a vector
> > > corresponding
> > > >> to
> > > >> > a
> > > >> > > > cluster 1..k. In such column, the first element would be # of
> > > >> points in
> > > >> > > the
> > > >> > > > cluster, and the rest of it would correspond to sum of all
> > points.
> > > >> So
> > > >> > in
> > > >> > > > order to arrive to an updated matrix C, we need to collect B
> > into
> > > >> > memory,
> > > >> > > > and slice out counters (first row) from the rest of it.
> > > >> > > >
> > > >> > > > So, to compute C:
> > > >> > > >
> > > >> > > > C <- B (2:,:) each row divided by B(1,:)
> > > >> > > >
> > > >> > > > (watch out for empty clusters with 0 elements, this will cause
> > > lack
> > > >> of
> > > >> > > > convergence and NaNs in the newly computed C).
> > > >> > > >
> > > >> > > > This operation obviously uses subblocking and row-wise
> iteration
> > > >> over
> > > >> > B,
> > > >> > > > for which i am again making reference to [2].
> > > >> > > >
> > > >> > > >
> > > >> > > > [1] https://github.com/apache/mahout/blob/master/math-scala/
> > > >> > > > src/main/scala/org/apache/mahout/math/drm/package.scala#L149
> > > >> > > >
> > > >> > > > [2], Sasmara manual, a bit dated but viable,
> > http://apache.github
> > > .
> > > >> > > > io/mahout/doc/ScalaSparkBindings.html
> > > >> > > >
> > > >> > > > [3] scaladoc, again, dated but largely viable for the purpose
> of
> > > >> this
> > > >> > > > exercise:
> > > >> > > > http://apache.github.io/mahout/0.10.1/docs/mahout-math-
> > > >> scala/index.htm
> > > >> > > >
> > > >> > > > [4] mapblock etc. http://apache.github.io/mahout
> > > >> /0.10.1/docs/mahout-
> > > >> > > > math-scala/index.html#org.apache.mahout.math.drm.RLikeDrmOps
> > > >> > > >
> > > >> > > > On Fri, Mar 31, 2017 at 9:54 AM, KHATWANI PARTH BHARAT <
> > > >> > > > [email protected]> wrote:
> > > >> > > >
> > > >> > > >> @Dmitriycan you please again tell me the approach to move
> > ahead.
> > > >> > > >>
> > > >> > > >>
> > > >> > > >> Thanks
> > > >> > > >> Parth Khatwani
> > > >> > > >>
> > > >> > > >>
> > > >> > > >> On Fri, Mar 31, 2017 at 10:15 PM, KHATWANI PARTH BHARAT <
> > > >> > > >> [email protected]> wrote:
> > > >> > > >>
> > > >> > > >> > yes i am unable to figure out the way ahead.
> > > >> > > >> > Like how to create the augmented matrix A := (0|D) which
> you
> > > have
> > > >> > > >> > mentioned.
> > > >> > > >> >
> > > >> > > >> >
> > > >> > > >> > On Fri, Mar 31, 2017 at 10:10 PM, Dmitriy Lyubimov <
> > > >> > [email protected]
> > > >> > > >
> > > >> > > >> > wrote:
> > > >> > > >> >
> > > >> > > >> >> was my reply for your post on @user has been a bit
> > confusing?
> > > >> > > >> >>
> > > >> > > >> >> On Fri, Mar 31, 2017 at 8:40 AM, KHATWANI PARTH BHARAT <
> > > >> > > >> >> [email protected]> wrote:
> > > >> > > >> >>
> > > >> > > >> >> > Sir,
> > > >> > > >> >> > I am trying to write the kmeans clustering algorithm
> using
> > > >> Mahout
> > > >> > > >> >> Samsara
> > > >> > > >> >> > but i am bit confused
> > > >> > > >> >> > about how to leverage Distributed Row Matrix for the
> same.
> > > Can
> > > >> > > >> anybody
> > > >> > > >> >> help
> > > >> > > >> >> > me with same.
> > > >> > > >> >> >
> > > >> > > >> >> >
> > > >> > > >> >> >
> > > >> > > >> >> >
> > > >> > > >> >> >
> > > >> > > >> >> > Thanks
> > > >> > > >> >> > Parth Khatwani
> > > >> > > >> >> >
> > > >> > > >> >>
> > > >> > > >> >
> > > >> > > >> >
> > > >> > > >>
> > > >> > > >
> > > >> > > >
> > > >> > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>