OK, i dug into this before i read your question carefully, that was my bad.

Assuming you want the aggregate transpose of :
{
 0 => {0:1.0,    1: 1.0,    2: 1.0,   3: 3.0}
 1 => {0:1.0,    1: 2.0,    2: 3.0,   3: 4.0}
 2 => {0:1.0,    1: 3.0,    2: 4.0,   3: 5.0}
 3 => {0:1.0,    1: 4.0,    2: 5.0,   3: 6.0}
}

to be
{
 0 => {1: 5.0}   // (not 4.0) // and 6.0 in your example...
 1 => {1: 9.0}
 2 => {1: 12.0}
 3 => {1: 15.0}
}


Then why not replace the mapBlock statement as follows:

val drm2 = (A(::, 1 until 4) cbind 0.0).mapBlock() {
  case (keys, block) =>
    for(row <- 0 until block.nrow) block(row, 3) = block(row, ::).sum
    (keys, block)
}
val aggTranspose = drm2(::, 3 until 4).t
println("Result of aggregating tranpose")
println(""+aggTranspose.collect)

Where we are creating an empty row, then filling it with the row sums.

A distributed rowSums fn would be nice for just such an occasion... sigh

Let me know if that gets you going again.  That was simpler than I thought-
sorry for delay on this.

PS
Candidly, I didn't explore further once i understood teh question, but if
you are going to collect this to the driver anyway (not sure if that is the
case)
A(::, 1 until 4).rowSums would also work.





Trevor Grant
Data Scientist
https://github.com/rawkintrevo
http://stackexchange.com/users/3002022/rawkintrevo
http://trevorgrant.org

*"Fortunate is he, who is able to know the causes of things."  -Virgil*


On Thu, Apr 20, 2017 at 9:01 PM, KHATWANI PARTH BHARAT <
h2016...@pilani.bits-pilani.ac.in> wrote:

> @Trevor Sir,
> I have attached the sample data file and here is the line to complete the Data
> File <https://drive.google.com/open?id=0Bxnnu_Ig2Et9QjZoM3dmY1V5WXM>.
>
>
> Following is the link for the Github Branch For the code
> https://github.com/parth2691/Spark_Mahout/tree/Dmitriy-Lyubimov
>
> KmeansMahout.scala
> <https://github.com/parth2691/Spark_Mahout/blob/Dmitriy-Lyubimov/KmeansMahout.scala>
>  is
> the complete code
>
>
> I also have made sample program just to test the assigning new values to
> the key to Row Matrix and aggregating transpose.I think assigning new
> values to the key to Row Matrix and aggregating transpose is causing the
> main problem in Kmean code
> Following is the link to Github repo for this code.
> TestClusterAssign.scala
> <https://github.com/parth2691/Spark_Mahout/blob/Dmitriy-Lyubimov/TestClusterAssign.scala>
>
> above code contains the hard coded data. Following is the expected and the
> actual output of the above code
> Out of 1st println After New Cluster assignment should be
> This
> {
>  0 => {0:1.0,    1: 1.0,    2: 1.0,   3: 3.0}
>  1 => {0:1.0,    1: 2.0,    2: 3.0,   3: 4.0}
>  2 => {0:1.0,    1: 3.0,    2: 4.0,   3: 5.0}
>  3 => {0:1.0,    1: 4.0,    2: 5.0,   3: 6.0}
> }
> (Here zeroth Column is used to store the centriod count and column 1,2 and
> 3 Contains Data)
>
> But Turns out to be this
> {
>  0 => {}
>  1 => {0:1.0,1:4.0,2:5.0,3:6.0}
>  2 => {}
>  3 => {}
> }
> And the result of aggregating Transpose should be
> {
>  0 => {1: 4.0}
>  1 => {1: 9.0}
>  2 => {1: 12.0}
>  3 => {1: 15.0}
> }
>
>
> Thanks Trevor for such a great Help
>
>
>
>
> Best Regards
> Parth
>
>
>
>
>
>
>
>
> On Fri, Apr 21, 2017 at 4:20 AM, Trevor Grant <trevor.d.gr...@gmail.com>
> wrote:
>
>> Hey
>>
>> Sorry for delay- was getting ready to tear into this.
>>
>> Would you mind posting a small sample of data that you would expect this
>> application to consume.
>>
>> tg
>>
>>
>> Trevor Grant
>> Data Scientist
>> https://github.com/rawkintrevo
>> http://stackexchange.com/users/3002022/rawkintrevo
>> http://trevorgrant.org
>>
>> *"Fortunate is he, who is able to know the causes of things."  -Virgil*
>>
>>
>> On Tue, Apr 18, 2017 at 11:32 PM, KHATWANI PARTH BHARAT <
>> h2016...@pilani.bits-pilani.ac.in> wrote:
>>
>> > @Dmitriy,@Trevor and @Andrew Sir,
>> > I am still stuck at the above problem can you please help me out with
>> it.
>> > I am unable  to find the proper reference to solve the above issue.
>> >
>> > Thanks & Regards
>> > Parth Khatwani
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >   <https://mailtrack.io/> Sent with Mailtrack
>> > <https://mailtrack.io/install?source=signature&lang=en&;
>> > referral=h2016...@pilani.bits-pilani.ac.in&idSignature=22>
>> >
>> > On Sat, Apr 15, 2017 at 10:07 AM, KHATWANI PARTH BHARAT <
>> > h2016...@pilani.bits-pilani.ac.in> wrote:
>> >
>> > > @Dmitriy,
>> > > @Trevor and @Andrew
>> > >
>> > > I have tried
>> > > Testing this Row Key assignment issue which i have mentioned in the
>> above
>> > > mail,
>> > > By Writing the a separate code where i am assigning the a default
>> value 1
>> > > to each row Key of The DRM and then taking the aggregating transpose
>> > > I have committed the separate  test code to the  Github Branch
>> > > <https://github.com/parth2691/Spark_Mahout/tree/Dmitriy-Lyubimov>.
>> > >
>> > > The Code is as follows
>> > >
>> > > val inCoreA = dense((1,1, 2, 3), (1,2, 3, 4), (1,3, 4, 5), (1,4, 5,
>> 6))
>> > >     val A = drmParallelize(m = inCoreA)
>> > >
>> > >     //Mapblock
>> > >     val drm2 = A.mapBlock() {
>> > >       case (keys, block) =>        for(row <- 0 until keys.size) {
>> > >
>> > >          * //assigning 1 to each row index*          keys(row) = 1
>> >   }        (keys, block)    }    prinln("After New Cluster assignment")
>> > println(""+drm2.collect)    val aggTranspose = drm2.t
>> println("Result of
>> > aggregating tranpose")    println(""+aggTranspose.collect)
>> > >
>> > > Out of 1st println After New Cluster assignment should be
>> > > This
>> > > {
>> > >  0 => {0:1.0,    1: 1.0,    2: 1.0,   3: 3.0}
>> > >  1 => {0:1.0,    1: 2.0,    2: 3.0,   3: 4.0}
>> > >  2 => {0:1.0,    1: 3.0,    2: 4.0,   3: 5.0}
>> > >  3 => {0:1.0,    1: 4.0,    2: 5.0,   3: 6.0}
>> > > }
>> > > (Here zeroth Column is used to store the centriod count and column 1,2
>> > and
>> > > 3 Contains Data)
>> > >
>> > > But Turns out to be this
>> > > {
>> > >  0 => {}
>> > >  1 => {0:1.0,1:4.0,2:5.0,3:6.0}
>> > >  2 => {}
>> > >  3 => {}
>> > > }
>> > > And the result of aggregating Transpose should be
>> > > {
>> > >  0 => {1: 4.0}
>> > >  1 => {1: 9.0}
>> > >  2 => {1: 12.0}
>> > >  3 => {1: 15.0}
>> > > }
>> > >
>> > >
>> > >  I have referred to the book written by Andrew And Dmitriy Apache
>> Mahout:
>> > > Beyond MapReduce
>> > > <https://www.amazon.com/Apache-Mahout-MapReduce-
>> > Dmitriy-Lyubimov/dp/1523775785> Aggregating
>> > > Transpose  and other concepts are explained very nicely over here but
>> i
>> > am
>> > > unable to find any example where
>> > > Row Keys are assigned new Values . Mahout Samsara Manual
>> > > http://apache.github.io/mahout/doc/ScalaSparkBindings.html Also Does
>> not
>> > > contain any such examples.
>> > > It will great if i can get some reference to solution of mentioned
>> issue.
>> > >
>> > >
>> > > Thanks
>> > > Parth Khatwani
>> > >
>> > >
>> > >
>> > > On Sat, Apr 15, 2017 at 12:13 AM, Andrew Palumbo <ap....@outlook.com>
>> > > wrote:
>> > >
>> > >> +1
>> > >>
>> > >>
>> > >>
>> > >> Sent from my Verizon Wireless 4G LTE smartphone
>> > >>
>> > >>
>> > >> -------- Original message --------
>> > >> From: Trevor Grant <trevor.d.gr...@gmail.com>
>> > >> Date: 04/14/2017 11:40 (GMT-08:00)
>> > >> To: dev@mahout.apache.org
>> > >> Subject: Re: Trying to write the KMeans Clustering Using "Apache
>> Mahout
>> > >> Samsara"
>> > >>
>> > >> Parth and Dmitriy,
>> > >>
>> > >> This is awesome- as a follow on can we work on getting this rolled
>> in to
>> > >> the algorithms framework?
>> > >>
>> > >> Happy to work with you on this Parth!
>> > >>
>> > >> Trevor Grant
>> > >> Data Scientist
>> > >> https://github.com/rawkintrevo
>> > >> http://stackexchange.com/users/3002022/rawkintrevo
>> > >> http://trevorgrant.org
>> > >>
>> > >> *"Fortunate is he, who is able to know the causes of things."
>> -Virgil*
>> > >>
>> > >>
>> > >> On Fri, Apr 14, 2017 at 1:27 PM, Dmitriy Lyubimov <dlie...@gmail.com
>> >
>> > >> wrote:
>> > >>
>> > >> > i would think reassinging keys should work in most cases.
>> > >> > The only exception is that technically Spark contracts imply that
>> > effect
>> > >> > should be idempotent if task is retried, which might be a problem
>> in a
>> > >> > specific scenario of the object tree coming out from block cache
>> > object
>> > >> > tree, which can stay there and be retried again. but specifically
>> > w.r.t.
>> > >> > this key assignment i don't see any problem since the action
>> obviously
>> > >> > would be idempotent even if this code is run multiple times on the
>> > same
>> > >> > (key, block) pair. This part should be good IMO.
>> > >> >
>> > >> > On Fri, Apr 14, 2017 at 2:26 AM, KHATWANI PARTH BHARAT <
>> > >> > h2016...@pilani.bits-pilani.ac.in> wrote:
>> > >> >
>> > >> > > @Dmitriy Sir,
>> > >> > > In the K means code above I think i am doing the following
>> > Incorrectly
>> > >> > >
>> > >> > > Assigning the closest centriod index to the Row Keys of DRM
>> > >> > >
>> > >> > > //11. Iterating over the Data Matrix(in DrmLike[Int] format) to
>> > >> calculate
>> > >> > > the initial centriods
>> > >> > >     dataDrmX.mapBlock() {
>> > >> > >       case (keys, block) =>
>> > >> > >         for (row <- 0 until block.nrow) {
>> > >> > >           var dataPoint = block(row, ::)
>> > >> > >
>> > >> > >           //12. findTheClosestCentriod find the closest centriod
>> to
>> > >> the
>> > >> > > Data point specified by "dataPoint"
>> > >> > >           val closesetIndex = findTheClosestCentriod(dataPoint,
>> > >> > centriods)
>> > >> > >
>> > >> > >           //13. assigning closest index to key
>> > >> > >           keys(row) = closesetIndex
>> > >> > >         }
>> > >> > >         keys -> block
>> > >> > >     }
>> > >> > >
>> > >> > >  in step 12 i am finding the centriod closest to the current
>> > dataPoint
>> > >> > >  in step13 i am assigning the closesetIndex to the key of the
>> > >> > corresponding
>> > >> > > row represented by the dataPoint
>> > >> > > I think i am doing step13 incorrectly.
>> > >> > >
>> > >> > > Also i am unable to find the proper reference for the same in the
>> > >> > reference
>> > >> > > links which you have mentioned above
>> > >> > >
>> > >> > >
>> > >> > > Thanks & Regards
>> > >> > > Parth Khatwani
>> > >> > >
>> > >> > >
>> > >> > >
>> > >> > >
>> > >> > >
>> > >> > > On Thu, Apr 13, 2017 at 6:24 PM, KHATWANI PARTH BHARAT <
>> > >> > > h2016...@pilani.bits-pilani.ac.in> wrote:
>> > >> > >
>> > >> > > > Dmitriy Sir,
>> > >> > > > I have Created a github branch Github Branch Having Initial
>> Kmeans
>> > >> Code
>> > >> > > > <https://github.com/parth2691/Spark_Mahout/tree/Dmitriy-Lyub
>> imov>
>> > >> > > >
>> > >> > > >
>> > >> > > > Thanks & Regards
>> > >> > > > Parth Khatwani
>> > >> > > >
>> > >> > > > On Thu, Apr 13, 2017 at 3:19 AM, Andrew Palumbo <
>> > ap....@outlook.com
>> > >> >
>> > >> > > > wrote:
>> > >> > > >
>> > >> > > >> +1 to creating a branch.
>> > >> > > >>
>> > >> > > >>
>> > >> > > >>
>> > >> > > >> Sent from my Verizon Wireless 4G LTE smartphone
>> > >> > > >>
>> > >> > > >>
>> > >> > > >> -------- Original message --------
>> > >> > > >> From: Dmitriy Lyubimov <dlie...@gmail.com>
>> > >> > > >> Date: 04/12/2017 11:25 (GMT-08:00)
>> > >> > > >> To: dev@mahout.apache.org
>> > >> > > >> Subject: Re: Trying to write the KMeans Clustering Using
>> "Apache
>> > >> > Mahout
>> > >> > > >> Samsara"
>> > >> > > >>
>> > >> > > >> can't say i can read this code well formatted that way...
>> > >> > > >>
>> > >> > > >> it would seem to me that the code is not using the broadcast
>> > >> variable
>> > >> > > and
>> > >> > > >> instead is using closure variable. that's the only thing i can
>> > >> > > immediately
>> > >> > > >> see by looking in the middle of it.
>> > >> > > >>
>> > >> > > >> it would be better if you created a branch on github for that
>> > code
>> > >> > that
>> > >> > > >> would allow for easy check-outs and comments.
>> > >> > > >>
>> > >> > > >> -d
>> > >> > > >>
>> > >> > > >> On Wed, Apr 12, 2017 at 10:29 AM, KHATWANI PARTH BHARAT <
>> > >> > > >> h2016...@pilani.bits-pilani.ac.in> wrote:
>> > >> > > >>
>> > >> > > >> > @Dmitriy Sir
>> > >> > > >> >
>> > >> > > >> > I have completed the Kmeans code as per the algorithm you
>> have
>> > >> > Outline
>> > >> > > >> > above
>> > >> > > >> >
>> > >> > > >> > My code is as follows
>> > >> > > >> >
>> > >> > > >> > This code works fine till step number 10
>> > >> > > >> >
>> > >> > > >> > In step 11 i am assigning the new centriod index  to
>> > >> corresponding
>> > >> > row
>> > >> > > >> key
>> > >> > > >> > of data Point in the matrix
>> > >> > > >> > I think i am doing something wrong in step 11 may be i am
>> using
>> > >> > > >> incorrect
>> > >> > > >> > syntax
>> > >> > > >> >
>> > >> > > >> > Can you help me find out what am i doing wrong.
>> > >> > > >> >
>> > >> > > >> >
>> > >> > > >> > //start of main method
>> > >> > > >> >
>> > >> > > >> > def main(args: Array[String]) {
>> > >> > > >> >      //1. initialize the spark and mahout context
>> > >> > > >> >     val conf = new SparkConf()
>> > >> > > >> >       .setAppName("DRMExample")
>> > >> > > >> >       .setMaster(args(0))
>> > >> > > >> >       .set("spark.serializer", "org.apache.spark.serializer.
>> > >> > > >> > KryoSerializer")
>> > >> > > >> >       .set("spark.kryo.registrator",
>> > >> > > >> > "org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator")
>> > >> > > >> >     implicit val sc = new SparkDistributedContext(new
>> > >> > > >> SparkContext(conf))
>> > >> > > >> >
>> > >> > > >> >     //2. read the data file and save it in the rdd
>> > >> > > >> >     val lines = sc.textFile(args(1))
>> > >> > > >> >
>> > >> > > >> >     //3. convert data read in as string in to array of
>> double
>> > >> > > >> >     val test = lines.map(line =>
>> line.split('\t').map(_.toDoubl
>> > >> e))
>> > >> > > >> >
>> > >> > > >> >     //4. add a column having value 1 in array of double this
>> > will
>> > >> > > >> > create something like (1 | D)',  which will be used while
>> > >> > calculating
>> > >> > > >> > (1 | D)'
>> > >> > > >> >     val augumentedArray = test.map(addCentriodColumn _)
>> > >> > > >> >
>> > >> > > >> >     //5. convert rdd of array of double in rdd of
>> DenseVector
>> > >> > > >> >     val rdd = augumentedArray.map(dvec(_))
>> > >> > > >> >
>> > >> > > >> >     //6. convert rdd to DrmRdd
>> > >> > > >> >     val rddMatrixLike: DrmRdd[Int] = rdd.zipWithIndex.map {
>> > case
>> > >> (v,
>> > >> > > >> > idx) => (idx.toInt, v) }        //7. convert DrmRdd to
>> > >> > > >> > CheckpointedDrm[Int]    val matrix = drmWrap(rddMatrixLike)
>> > >> //8.
>> > >> > > >> > seperating the column having all ones created in step 4 and
>> > will
>> > >> use
>> > >> > > >> > it later    val oneVector = matrix(::, 0 until 1)
>> //9.
>> > >> final
>> > >> > > >> > input data in DrmLike[Int] format    val dataDrmX =
>> matrix(::,
>> > 1
>> > >> > until
>> > >> > > >> > 4)            //9. Sampling to select initial centriods
>> val
>> > >> > > >> > centriods = drmSampleKRows(dataDrmX, 2, false)
>> > centriods.size
>> > >> > > >> > //10. Broad Casting the initial centriods    val
>> > broadCastMatrix
>> > >> =
>> > >> > > >> > drmBroadcast(centriods)            //11. Iterating over the
>> > Data
>> > >> > > >> > Matrix(in DrmLike[Int] format) to calculate the initial
>> > centriods
>> > >> > > >> > dataDrmX.mapBlock() {      case (keys, block) =>        for
>> > (row
>> > >> <-
>> > >> > 0
>> > >> > > >> > until block.nrow) {          var dataPoint = block(row, ::)
>> > >> > > >> >         //12. findTheClosestCentriod find the closest
>> centriod
>> > to
>> > >> > the
>> > >> > > >> > Data point specified by "dataPoint"          val
>> closesetIndex
>> > =
>> > >> > > >> > findTheClosestCentriod(dataPoint, centriods)
>> > >> > //13.
>> > >> > > >> > assigning closest index to key          keys(row) =
>> > closesetIndex
>> > >> > > >> >   }        keys -> block    }
>> > >> > > >> >
>> > >> > > >> >     //14. Calculating the (1|D)      val b = (oneVector
>> cbind
>> > >> > > >> > dataDrmX)        //15. Aggregating Transpose (1|D)'    val
>> > >> > bTranspose
>> > >> > > >> > = (oneVector cbind dataDrmX).t    // after step 15
>> bTranspose
>> > >> will
>> > >> > > >> > have data in the following format        /*(n+1)*K where
>> > >> n=dimension
>> > >> > > >> > of the data point, K=number of clusters    * zeroth row will
>> > >> contain
>> > >> > > >> > the count of points assigned to each cluster    * assuming
>> 3d
>> > >> data
>> > >> > > >> > points     *     */
>> > >> > > >> >
>> > >> > > >> >
>> > >> > > >> >     val nrows = b.nrow.toInt    //16. slicing the count
>> vectors
>> > >> out
>> > >> > > >> >  val pointCountVectors = drmBroadcast(b(0 until 1,
>> > ::).collect(0,
>> > >> > ::))
>> > >> > > >> >    val vectorSums = b(1 until nrows, ::)    //17. dividing
>> the
>> > >> data
>> > >> > > >> > point by count vector    vectorSums.mapBlock() {      case
>> > (keys,
>> > >> > > >> > block) =>        for (row <- 0 until block.nrow) {
>> > >> > block(row,
>> > >> > > >> > ::) /= pointCountVectors        }        keys -> block    }
>> > >> //18.
>> > >> > > >> > seperating the count vectors    val newCentriods =
>> > >> vectorSums.t(::,1
>> > >> > > >> > until centriods.size)            //19. iterate over the
>> above
>> > >> code
>> > >> > > >> > till convergence criteria is meet   }//end of main method
>> > >> > > >> >
>> > >> > > >> >
>> > >> > > >> >
>> > >> > > >> >   // method to find the closest centriod to data point( vec:
>> > >> Vector
>> > >> > > >> > in the arguments)  def findTheClosestCentriod(vec: Vector,
>> > >> matrix:
>> > >> > > >> > Matrix): Int = {
>> > >> > > >> >     var index = 0
>> > >> > > >> >     var closest = Double.PositiveInfinity
>> > >> > > >> >     for (row <- 0 until matrix.nrow) {
>> > >> > > >> >       val squaredSum = ssr(vec, matrix(row, ::))
>> > >> > > >> >       val tempDist = Math.sqrt(ssr(vec, matrix(row, ::)))
>> > >> > > >> >       if (tempDist < closest) {
>> > >> > > >> >         closest = tempDist
>> > >> > > >> >         index = row
>> > >> > > >> >       }
>> > >> > > >> >     }
>> > >> > > >> >     index
>> > >> > > >> >   }
>> > >> > > >> >
>> > >> > > >> >    //calculating the sum of squared distance between the
>> > >> > > points(Vectors)
>> > >> > > >> >   def ssr(a: Vector, b: Vector): Double = {
>> > >> > > >> >     (a - b) ^= 2 sum
>> > >> > > >> >   }
>> > >> > > >> >
>> > >> > > >> >   //method used to create (1|D)
>> > >> > > >> >   def addCentriodColumn(arg: Array[Double]): Array[Double]
>> = {
>> > >> > > >> >     val newArr = new Array[Double](arg.length + 1)
>> > >> > > >> >     newArr(0) = 1.0;
>> > >> > > >> >     for (i <- 0 until (arg.size)) {
>> > >> > > >> >       newArr(i + 1) = arg(i);
>> > >> > > >> >     }
>> > >> > > >> >     newArr
>> > >> > > >> >   }
>> > >> > > >> >
>> > >> > > >> >
>> > >> > > >> > Thanks & Regards
>> > >> > > >> > Parth Khatwani
>> > >> > > >> >
>> > >> > > >> >
>> > >> > > >> >
>> > >> > > >> > On Mon, Apr 3, 2017 at 7:37 PM, KHATWANI PARTH BHARAT <
>> > >> > > >> > h2016...@pilani.bits-pilani.ac.in> wrote:
>> > >> > > >> >
>> > >> > > >> > >
>> > >> > > >> > > ---------- Forwarded message ----------
>> > >> > > >> > > From: Dmitriy Lyubimov <dlie...@gmail.com>
>> > >> > > >> > > Date: Fri, Mar 31, 2017 at 11:34 PM
>> > >> > > >> > > Subject: Re: Trying to write the KMeans Clustering Using
>> > >> "Apache
>> > >> > > >> Mahout
>> > >> > > >> > > Samsara"
>> > >> > > >> > > To: "dev@mahout.apache.org" <dev@mahout.apache.org>
>> > >> > > >> > >
>> > >> > > >> > >
>> > >> > > >> > > ps1 this assumes row-wise construction of A based on
>> training
>> > >> set
>> > >> > > of m
>> > >> > > >> > > n-dimensional points.
>> > >> > > >> > > ps2 since we are doing multiple passes over A it may make
>> > >> sense to
>> > >> > > >> make
>> > >> > > >> > > sure it is committed to spark cache (by using checkpoint
>> > api),
>> > >> if
>> > >> > > >> spark
>> > >> > > >> > is
>> > >> > > >> > > used
>> > >> > > >> > >
>> > >> > > >> > > On Fri, Mar 31, 2017 at 10:53 AM, Dmitriy Lyubimov <
>> > >> > > dlie...@gmail.com
>> > >> > > >> >
>> > >> > > >> > > wrote:
>> > >> > > >> > >
>> > >> > > >> > > > here is the outline. For details of APIs, please refer
>> to
>> > >> > samsara
>> > >> > > >> > manual
>> > >> > > >> > > > [2], i will not be be repeating it.
>> > >> > > >> > > >
>> > >> > > >> > > > Assume your training data input is m x n matrix A. For
>> > >> > simplicity
>> > >> > > >> let's
>> > >> > > >> > > > assume it's a DRM with int row keys, i.e., DrmLike[Int].
>> > >> > > >> > > >
>> > >> > > >> > > > Initialization:
>> > >> > > >> > > >
>> > >> > > >> > > > First, classic k-means starts by selecting initial
>> > clusters,
>> > >> by
>> > >> > > >> > sampling
>> > >> > > >> > > > them out. You can do that by using sampling api [1],
>> thus
>> > >> > forming
>> > >> > > a
>> > >> > > >> k
>> > >> > > >> > x n
>> > >> > > >> > > > in-memory matrix C (current centroids). C is therefore
>> of
>> > >> > Mahout's
>> > >> > > >> > Matrix
>> > >> > > >> > > > type.
>> > >> > > >> > > >
>> > >> > > >> > > > You the proceed by alternating between cluster
>> assignments
>> > >> and
>> > >> > > >> > > > recompupting centroid matrix C till convergence based on
>> > some
>> > >> > test
>> > >> > > >> or
>> > >> > > >> > > > simply limited by epoch count budget, your choice.
>> > >> > > >> > > >
>> > >> > > >> > > > Cluster assignments: here, we go over current generation
>> > of A
>> > >> > and
>> > >> > > >> > > > recompute centroid indexes for each row in A. Once we
>> > >> recompute
>> > >> > > >> index,
>> > >> > > >> > we
>> > >> > > >> > > > put it into the row key . You can do that by assigning
>> > >> centroid
>> > >> > > >> indices
>> > >> > > >> > > to
>> > >> > > >> > > > keys of A using operator mapblock() (details in [2],
>> [3],
>> > >> [4]).
>> > >> > > You
>> > >> > > >> > also
>> > >> > > >> > > > need to broadcast C in order to be able to access it in
>> > >> > efficient
>> > >> > > >> > manner
>> > >> > > >> > > > inside mapblock() closure. Examples of that are plenty
>> > given
>> > >> in
>> > >> > > [2].
>> > >> > > >> > > > Essentially, in mapblock, you'd reform the row keys to
>> > >> reflect
>> > >> > > >> cluster
>> > >> > > >> > > > index in C. while going over A, you'd have a "nearest
>> > >> neighbor"
>> > >> > > >> problem
>> > >> > > >> > > to
>> > >> > > >> > > > solve for the row of A and centroids C. This is the
>> bulk of
>> > >> > > >> computation
>> > >> > > >> > > > really, and there are a few tricks there that can speed
>> > this
>> > >> > step
>> > >> > > >> up in
>> > >> > > >> > > > both exact and approximate manner, but you can start
>> with a
>> > >> > naive
>> > >> > > >> > search.
>> > >> > > >> > > >
>> > >> > > >> > > > Centroid recomputation:
>> > >> > > >> > > > once you assigned centroids to the keys of marix A,
>> you'd
>> > >> want
>> > >> > to
>> > >> > > >> do an
>> > >> > > >> > > > aggregating transpose of A to compute essentially
>> average
>> > of
>> > >> > row A
>> > >> > > >> > > grouped
>> > >> > > >> > > > by the centroid key. The trick is to do a computation of
>> > >> (1|A)'
>> > >> > > >> which
>> > >> > > >> > > will
>> > >> > > >> > > > results in a matrix of the shape (Counts/sums of cluster
>> > >> rows).
>> > >> > > >> This is
>> > >> > > >> > > the
>> > >> > > >> > > > part i find difficult to explain without a latex
>> graphics.
>> > >> > > >> > > >
>> > >> > > >> > > > In Samsara, construction of (1|A)' corresponds to DRM
>> > >> expression
>> > >> > > >> > > >
>> > >> > > >> > > > (1 cbind A).t (again, see [2]).
>> > >> > > >> > > >
>> > >> > > >> > > > So when you compute, say,
>> > >> > > >> > > >
>> > >> > > >> > > > B = (1 | A)',
>> > >> > > >> > > >
>> > >> > > >> > > > then B is (n+1) x k, so each column contains a vector
>> > >> > > corresponding
>> > >> > > >> to
>> > >> > > >> > a
>> > >> > > >> > > > cluster 1..k. In such column, the first element would
>> be #
>> > of
>> > >> > > >> points in
>> > >> > > >> > > the
>> > >> > > >> > > > cluster, and the rest of it would correspond to sum of
>> all
>> > >> > points.
>> > >> > > >> So
>> > >> > > >> > in
>> > >> > > >> > > > order to arrive to an updated matrix C, we need to
>> collect
>> > B
>> > >> > into
>> > >> > > >> > memory,
>> > >> > > >> > > > and slice out counters (first row) from the rest of it.
>> > >> > > >> > > >
>> > >> > > >> > > > So, to compute C:
>> > >> > > >> > > >
>> > >> > > >> > > > C <- B (2:,:) each row divided by B(1,:)
>> > >> > > >> > > >
>> > >> > > >> > > > (watch out for empty clusters with 0 elements, this will
>> > >> cause
>> > >> > > lack
>> > >> > > >> of
>> > >> > > >> > > > convergence and NaNs in the newly computed C).
>> > >> > > >> > > >
>> > >> > > >> > > > This operation obviously uses subblocking and row-wise
>> > >> iteration
>> > >> > > >> over
>> > >> > > >> > B,
>> > >> > > >> > > > for which i am again making reference to [2].
>> > >> > > >> > > >
>> > >> > > >> > > >
>> > >> > > >> > > > [1] https://github.com/apache/
>> > mahout/blob/master/math-scala/
>> > >> > > >> > > > src/main/scala/org/apache/maho
>> ut/math/drm/package.scala#
>> > L149
>> > >> > > >> > > >
>> > >> > > >> > > > [2], Sasmara manual, a bit dated but viable,
>> > >> > http://apache.github
>> > >> > > .
>> > >> > > >> > > > io/mahout/doc/ScalaSparkBindings.html
>> > >> > > >> > > >
>> > >> > > >> > > > [3] scaladoc, again, dated but largely viable for the
>> > >> purpose of
>> > >> > > >> this
>> > >> > > >> > > > exercise:
>> > >> > > >> > > > http://apache.github.io/mahout/0.10.1/docs/mahout-math-
>> > >> > > >> scala/index.htm
>> > >> > > >> > > >
>> > >> > > >> > > > [4] mapblock etc. http://apache.github.io/mahout
>> > >> > > >> /0.10.1/docs/mahout-
>> > >> > > >> > > > math-scala/index.html#org.apache.mahout.math.drm.
>> > RLikeDrmOps
>> > >> > > >> > > >
>> > >> > > >> > > > On Fri, Mar 31, 2017 at 9:54 AM, KHATWANI PARTH BHARAT <
>> > >> > > >> > > > h2016...@pilani.bits-pilani.ac.in> wrote:
>> > >> > > >> > > >
>> > >> > > >> > > >> @Dmitriycan you please again tell me the approach to
>> move
>> > >> > ahead.
>> > >> > > >> > > >>
>> > >> > > >> > > >>
>> > >> > > >> > > >> Thanks
>> > >> > > >> > > >> Parth Khatwani
>> > >> > > >> > > >>
>> > >> > > >> > > >>
>> > >> > > >> > > >> On Fri, Mar 31, 2017 at 10:15 PM, KHATWANI PARTH
>> BHARAT <
>> > >> > > >> > > >> h2016...@pilani.bits-pilani.ac.in> wrote:
>> > >> > > >> > > >>
>> > >> > > >> > > >> > yes i am unable to figure out the way ahead.
>> > >> > > >> > > >> > Like how to create the augmented matrix A := (0|D)
>> which
>> > >> you
>> > >> > > have
>> > >> > > >> > > >> > mentioned.
>> > >> > > >> > > >> >
>> > >> > > >> > > >> >
>> > >> > > >> > > >> > On Fri, Mar 31, 2017 at 10:10 PM, Dmitriy Lyubimov <
>> > >> > > >> > dlie...@gmail.com
>> > >> > > >> > > >
>> > >> > > >> > > >> > wrote:
>> > >> > > >> > > >> >
>> > >> > > >> > > >> >> was my reply for your post on @user has been a bit
>> > >> > confusing?
>> > >> > > >> > > >> >>
>> > >> > > >> > > >> >> On Fri, Mar 31, 2017 at 8:40 AM, KHATWANI PARTH
>> BHARAT
>> > <
>> > >> > > >> > > >> >> h2016...@pilani.bits-pilani.ac.in> wrote:
>> > >> > > >> > > >> >>
>> > >> > > >> > > >> >> > Sir,
>> > >> > > >> > > >> >> > I am trying to write the kmeans clustering
>> algorithm
>> > >> using
>> > >> > > >> Mahout
>> > >> > > >> > > >> >> Samsara
>> > >> > > >> > > >> >> > but i am bit confused
>> > >> > > >> > > >> >> > about how to leverage Distributed Row Matrix for
>> the
>> > >> same.
>> > >> > > Can
>> > >> > > >> > > >> anybody
>> > >> > > >> > > >> >> help
>> > >> > > >> > > >> >> > me with same.
>> > >> > > >> > > >> >> >
>> > >> > > >> > > >> >> >
>> > >> > > >> > > >> >> >
>> > >> > > >> > > >> >> >
>> > >> > > >> > > >> >> >
>> > >> > > >> > > >> >> > Thanks
>> > >> > > >> > > >> >> > Parth Khatwani
>> > >> > > >> > > >> >> >
>> > >> > > >> > > >> >>
>> > >> > > >> > > >> >
>> > >> > > >> > > >> >
>> > >> > > >> > > >>
>> > >> > > >> > > >
>> > >> > > >> > > >
>> > >> > > >> > >
>> > >> > > >> > >
>> > >> > > >> >
>> > >> > > >>
>> > >> > > >
>> > >> > > >
>> > >> > >
>> > >> >
>> > >>
>> > >
>> > >
>> >
>>
>
>

Reply via email to