There appears to be a bug in Spark transposition operator w.r.t.
aggregating semantics which appears in cases where the same cluster (key)
is present more than once in the same block. The fix is one character long
(+ better test for aggregation).



On Fri, Apr 21, 2017 at 1:06 PM, KHATWANI PARTH BHARAT <
h2016...@pilani.bits-pilani.ac.in> wrote:

> One is the cluster ID of the Index to which the data point should be
> assigned.
> As per what is given in this book Apache-Mahout-Mapreduce-Dmitriy-Lyubimov
> <http://www.amazon.in/Apache-Mahout-Mapreduce-Dmitriy-
> Lyubimov/dp/1523775785>
> in
> chapter 4 about the aggregating Transpose.
> From what i have understood is that row having the same key will added when
> we take aggregating transpose of the matrix.
> So i think there should be a way to assign new  values to row keys and i
> think Dimitriy  Has also mentioned the same thing i approach he has
> outlined in this mail chain
> Correct me if i am wrong.
>
>
> Thanks
> Parth Khatwani
>
>
>
>
>
>
> On Sat, Apr 22, 2017 at 1:54 AM, Trevor Grant <trevor.d.gr...@gmail.com>
> wrote:
>
> > Got it- in short no.
> >
> > Think of the keys like a dictionary or HashMap.
> >
> > That's why everything is ending up on row 1.
> >
> > What are you trying to achieve by creating keys of 1?
> >
> > Trevor Grant
> > Data Scientist
> > https://github.com/rawkintrevo
> > http://stackexchange.com/users/3002022/rawkintrevo
> > http://trevorgrant.org
> >
> > *"Fortunate is he, who is able to know the causes of things."  -Virgil*
> >
> >
> > On Fri, Apr 21, 2017 at 2:26 PM, KHATWANI PARTH BHARAT <
> > h2016...@pilani.bits-pilani.ac.in> wrote:
> >
> > > @Trevor
> > >
> > >
> > >
> > > In was trying to write the "*Kmeans*" Using Mahout DRM as per the
> > algorithm
> > > outlined by Dmitriy.
> > > I was facing the Problem of assigning cluster Ids to the Row Keys
> > > For Example
> > > Consider the below matrix Where column 1 to 3 are the data points and
> > > column 0 Containing the count of the point
> > > {
> > >  0 => {0:1.0,    1: 1.0,    2: 1.0,   3: 3.0}
> > >  1 => {0:1.0,    1: 2.0,    2: 3.0,   3: 4.0}
> > >  2 => {0:1.0,    1: 3.0,    2: 4.0,   3: 5.0}
> > >  3 => {0:1.0,    1: 4.0,    2: 5.0,   3: 6.0}
> > > }
> > >
> > > now after calculating the centriod which  closest to the data point
> data
> > > zeroth index i am trying to assign the centriod index to *row key *
> > >
> > > Now Suppose say that every data point is assigned to centriod at index
> 1
> > > so after assigning the key=1 to each and every row
> > >
> > > using the  code below
> > >
> > >  val drm2 = A.mapBlock() {
> > >       case (keys, block) =>        for(row <- 0 until keys.size) {
> > >
> > >          * //assigning 1 to each row index*          keys(row) = 1
> > >    }        (keys, block)    }
> > >
> > >
> > >
> > > I want above matrix to be in this form
> > >
> > >
> > > {
> > >  1 => {0:1.0,    1: 1.0,    2: 1.0,   3: 3.0}
> > >  1 => {0:1.0,    1: 2.0,    2: 3.0,   3: 4.0}
> > >  1 => {0:1.0,    1: 3.0,    2: 4.0,   3: 5.0}
> > >  1 => {0:1.0,    1: 4.0,    2: 5.0,   3: 6.0}
> > > }
> > >
> > >
> > >
> > >
> > >  Turns out to be this
> > > {
> > >  0 => {}
> > >  1 => {0:1.0,1:4.0,2:5.0,3:6.0}
> > >  2 => {}
> > >  3 => {}
> > > }
> > >
> > >
> > >
> > > I am confused weather assigning the new Key Values to the row index is
> > done
> > > through the following code line
> > >
> > > * //assigning 1 to each row index*          keys(row) = 1
> > >
> > >
> > > or is there any other way.
> > >
> > >
> > >
> > > I am not able to find any use links or reference on internet even
> Andrew
> > > and Dmitriy's book also does not have any proper reference for the
> > > above mentioned issue.
> > >
> > >
> > >
> > > Thanks & Regards
> > > Parth Khatwani
> > >
> > >
> > >
> > > On Fri, Apr 21, 2017 at 10:06 PM, Trevor Grant <
> trevor.d.gr...@gmail.com
> > >
> > > wrote:
> > >
> > > > OK, i dug into this before i read your question carefully, that was
> my
> > > bad.
> > > >
> > > > Assuming you want the aggregate transpose of :
> > > > {
> > > >  0 => {0:1.0,    1: 1.0,    2: 1.0,   3: 3.0}
> > > >  1 => {0:1.0,    1: 2.0,    2: 3.0,   3: 4.0}
> > > >  2 => {0:1.0,    1: 3.0,    2: 4.0,   3: 5.0}
> > > >  3 => {0:1.0,    1: 4.0,    2: 5.0,   3: 6.0}
> > > > }
> > > >
> > > > to be
> > > > {
> > > >  0 => {1: 5.0}   // (not 4.0) // and 6.0 in your example...
> > > >  1 => {1: 9.0}
> > > >  2 => {1: 12.0}
> > > >  3 => {1: 15.0}
> > > > }
> > > >
> > > >
> > > > Then why not replace the mapBlock statement as follows:
> > > >
> > > > val drm2 = (A(::, 1 until 4) cbind 0.0).mapBlock() {
> > > >   case (keys, block) =>
> > > >     for(row <- 0 until block.nrow) block(row, 3) = block(row, ::).sum
> > > >     (keys, block)
> > > > }
> > > > val aggTranspose = drm2(::, 3 until 4).t
> > > > println("Result of aggregating tranpose")
> > > > println(""+aggTranspose.collect)
> > > >
> > > > Where we are creating an empty row, then filling it with the row
> sums.
> > > >
> > > > A distributed rowSums fn would be nice for just such an occasion...
> > sigh
> > > >
> > > > Let me know if that gets you going again.  That was simpler than I
> > > thought-
> > > > sorry for delay on this.
> > > >
> > > > PS
> > > > Candidly, I didn't explore further once i understood teh question,
> but
> > if
> > > > you are going to collect this to the driver anyway (not sure if that
> is
> > > the
> > > > case)
> > > > A(::, 1 until 4).rowSums would also work.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > Trevor Grant
> > > > Data Scientist
> > > > https://github.com/rawkintrevo
> > > > http://stackexchange.com/users/3002022/rawkintrevo
> > > > http://trevorgrant.org
> > > >
> > > > *"Fortunate is he, who is able to know the causes of things."
> -Virgil*
> > > >
> > > >
> > > > On Thu, Apr 20, 2017 at 9:01 PM, KHATWANI PARTH BHARAT <
> > > > h2016...@pilani.bits-pilani.ac.in> wrote:
> > > >
> > > > > @Trevor Sir,
> > > > > I have attached the sample data file and here is the line to
> complete
> > > > the Data
> > > > > File <https://drive.google.com/open?id=0Bxnnu_
> Ig2Et9QjZoM3dmY1V5WXM
> > >.
> > > > >
> > > > >
> > > > > Following is the link for the Github Branch For the code
> > > > > https://github.com/parth2691/Spark_Mahout/tree/Dmitriy-Lyubimov
> > > > >
> > > > > KmeansMahout.scala
> > > > > <https://github.com/parth2691/Spark_Mahout/blob/Dmitriy-
> > > > Lyubimov/KmeansMahout.scala> is
> > > > > the complete code
> > > > >
> > > > >
> > > > > I also have made sample program just to test the assigning new
> values
> > > to
> > > > > the key to Row Matrix and aggregating transpose.I think assigning
> new
> > > > > values to the key to Row Matrix and aggregating transpose is
> causing
> > > the
> > > > > main problem in Kmean code
> > > > > Following is the link to Github repo for this code.
> > > > > TestClusterAssign.scala
> > > > > <https://github.com/parth2691/Spark_Mahout/blob/Dmitriy-
> > > > Lyubimov/TestClusterAssign.scala>
> > > > >
> > > > > above code contains the hard coded data. Following is the expected
> > and
> > > > the
> > > > > actual output of the above code
> > > > > Out of 1st println After New Cluster assignment should be
> > > > > This
> > > > > {
> > > > >  0 => {0:1.0,    1: 1.0,    2: 1.0,   3: 3.0}
> > > > >  1 => {0:1.0,    1: 2.0,    2: 3.0,   3: 4.0}
> > > > >  2 => {0:1.0,    1: 3.0,    2: 4.0,   3: 5.0}
> > > > >  3 => {0:1.0,    1: 4.0,    2: 5.0,   3: 6.0}
> > > > > }
> > > > > (Here zeroth Column is used to store the centriod count and column
> > 1,2
> > > > and
> > > > > 3 Contains Data)
> > > > >
> > > > > But Turns out to be this
> > > > > {
> > > > >  0 => {}
> > > > >  1 => {0:1.0,1:4.0,2:5.0,3:6.0}
> > > > >  2 => {}
> > > > >  3 => {}
> > > > > }
> > > > > And the result of aggregating Transpose should be
> > > > > {
> > > > >  0 => {1: 4.0}
> > > > >  1 => {1: 9.0}
> > > > >  2 => {1: 12.0}
> > > > >  3 => {1: 15.0}
> > > > > }
> > > > >
> > > > >
> > > > > Thanks Trevor for such a great Help
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > Best Regards
> > > > > Parth
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Fri, Apr 21, 2017 at 4:20 AM, Trevor Grant <
> > > trevor.d.gr...@gmail.com>
> > > > > wrote:
> > > > >
> > > > >> Hey
> > > > >>
> > > > >> Sorry for delay- was getting ready to tear into this.
> > > > >>
> > > > >> Would you mind posting a small sample of data that you would
> expect
> > > this
> > > > >> application to consume.
> > > > >>
> > > > >> tg
> > > > >>
> > > > >>
> > > > >> Trevor Grant
> > > > >> Data Scientist
> > > > >> https://github.com/rawkintrevo
> > > > >> http://stackexchange.com/users/3002022/rawkintrevo
> > > > >> http://trevorgrant.org
> > > > >>
> > > > >> *"Fortunate is he, who is able to know the causes of things."
> > > -Virgil*
> > > > >>
> > > > >>
> > > > >> On Tue, Apr 18, 2017 at 11:32 PM, KHATWANI PARTH BHARAT <
> > > > >> h2016...@pilani.bits-pilani.ac.in> wrote:
> > > > >>
> > > > >> > @Dmitriy,@Trevor and @Andrew Sir,
> > > > >> > I am still stuck at the above problem can you please help me out
> > > with
> > > > >> it.
> > > > >> > I am unable  to find the proper reference to solve the above
> > issue.
> > > > >> >
> > > > >> > Thanks & Regards
> > > > >> > Parth Khatwani
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> >   <https://mailtrack.io/> Sent with Mailtrack
> > > > >> > <https://mailtrack.io/install?source=signature&lang=en&;
> > > > >> > referral=h2016...@pilani.bits-pilani.ac.in&idSignature=22>
> > > > >> >
> > > > >> > On Sat, Apr 15, 2017 at 10:07 AM, KHATWANI PARTH BHARAT <
> > > > >> > h2016...@pilani.bits-pilani.ac.in> wrote:
> > > > >> >
> > > > >> > > @Dmitriy,
> > > > >> > > @Trevor and @Andrew
> > > > >> > >
> > > > >> > > I have tried
> > > > >> > > Testing this Row Key assignment issue which i have mentioned
> in
> > > the
> > > > >> above
> > > > >> > > mail,
> > > > >> > > By Writing the a separate code where i am assigning the a
> > default
> > > > >> value 1
> > > > >> > > to each row Key of The DRM and then taking the aggregating
> > > transpose
> > > > >> > > I have committed the separate  test code to the  Github Branch
> > > > >> > > <https://github.com/parth2691/Spark_Mahout/tree/Dmitriy-
> > Lyubimov
> > > >.
> > > > >> > >
> > > > >> > > The Code is as follows
> > > > >> > >
> > > > >> > > val inCoreA = dense((1,1, 2, 3), (1,2, 3, 4), (1,3, 4, 5),
> (1,4,
> > > 5,
> > > > >> 6))
> > > > >> > >     val A = drmParallelize(m = inCoreA)
> > > > >> > >
> > > > >> > >     //Mapblock
> > > > >> > >     val drm2 = A.mapBlock() {
> > > > >> > >       case (keys, block) =>        for(row <- 0 until
> > keys.size) {
> > > > >> > >
> > > > >> > >          * //assigning 1 to each row index*          keys(row)
> > = 1
> > > > >> >   }        (keys, block)    }    prinln("After New Cluster
> > > > assignment")
> > > > >> > println(""+drm2.collect)    val aggTranspose = drm2.t
> > > > >> println("Result of
> > > > >> > aggregating tranpose")    println(""+aggTranspose.collect)
> > > > >> > >
> > > > >> > > Out of 1st println After New Cluster assignment should be
> > > > >> > > This
> > > > >> > > {
> > > > >> > >  0 => {0:1.0,    1: 1.0,    2: 1.0,   3: 3.0}
> > > > >> > >  1 => {0:1.0,    1: 2.0,    2: 3.0,   3: 4.0}
> > > > >> > >  2 => {0:1.0,    1: 3.0,    2: 4.0,   3: 5.0}
> > > > >> > >  3 => {0:1.0,    1: 4.0,    2: 5.0,   3: 6.0}
> > > > >> > > }
> > > > >> > > (Here zeroth Column is used to store the centriod count and
> > column
> > > > 1,2
> > > > >> > and
> > > > >> > > 3 Contains Data)
> > > > >> > >
> > > > >> > > But Turns out to be this
> > > > >> > > {
> > > > >> > >  0 => {}
> > > > >> > >  1 => {0:1.0,1:4.0,2:5.0,3:6.0}
> > > > >> > >  2 => {}
> > > > >> > >  3 => {}
> > > > >> > > }
> > > > >> > > And the result of aggregating Transpose should be
> > > > >> > > {
> > > > >> > >  0 => {1: 4.0}
> > > > >> > >  1 => {1: 9.0}
> > > > >> > >  2 => {1: 12.0}
> > > > >> > >  3 => {1: 15.0}
> > > > >> > > }
> > > > >> > >
> > > > >> > >
> > > > >> > >  I have referred to the book written by Andrew And Dmitriy
> > Apache
> > > > >> Mahout:
> > > > >> > > Beyond MapReduce
> > > > >> > > <https://www.amazon.com/Apache-Mahout-MapReduce-
> > > > >> > Dmitriy-Lyubimov/dp/1523775785> Aggregating
> > > > >> > > Transpose  and other concepts are explained very nicely over
> > here
> > > > but
> > > > >> i
> > > > >> > am
> > > > >> > > unable to find any example where
> > > > >> > > Row Keys are assigned new Values . Mahout Samsara Manual
> > > > >> > > http://apache.github.io/mahout/doc/ScalaSparkBindings.html
> Also
> > > > Does
> > > > >> not
> > > > >> > > contain any such examples.
> > > > >> > > It will great if i can get some reference to solution of
> > mentioned
> > > > >> issue.
> > > > >> > >
> > > > >> > >
> > > > >> > > Thanks
> > > > >> > > Parth Khatwani
> > > > >> > >
> > > > >> > >
> > > > >> > >
> > > > >> > > On Sat, Apr 15, 2017 at 12:13 AM, Andrew Palumbo <
> > > > ap....@outlook.com>
> > > > >> > > wrote:
> > > > >> > >
> > > > >> > >> +1
> > > > >> > >>
> > > > >> > >>
> > > > >> > >>
> > > > >> > >> Sent from my Verizon Wireless 4G LTE smartphone
> > > > >> > >>
> > > > >> > >>
> > > > >> > >> -------- Original message --------
> > > > >> > >> From: Trevor Grant <trevor.d.gr...@gmail.com>
> > > > >> > >> Date: 04/14/2017 11:40 (GMT-08:00)
> > > > >> > >> To: dev@mahout.apache.org
> > > > >> > >> Subject: Re: Trying to write the KMeans Clustering Using
> > "Apache
> > > > >> Mahout
> > > > >> > >> Samsara"
> > > > >> > >>
> > > > >> > >> Parth and Dmitriy,
> > > > >> > >>
> > > > >> > >> This is awesome- as a follow on can we work on getting this
> > > rolled
> > > > >> in to
> > > > >> > >> the algorithms framework?
> > > > >> > >>
> > > > >> > >> Happy to work with you on this Parth!
> > > > >> > >>
> > > > >> > >> Trevor Grant
> > > > >> > >> Data Scientist
> > > > >> > >> https://github.com/rawkintrevo
> > > > >> > >> http://stackexchange.com/users/3002022/rawkintrevo
> > > > >> > >> http://trevorgrant.org
> > > > >> > >>
> > > > >> > >> *"Fortunate is he, who is able to know the causes of things."
> > > > >> -Virgil*
> > > > >> > >>
> > > > >> > >>
> > > > >> > >> On Fri, Apr 14, 2017 at 1:27 PM, Dmitriy Lyubimov <
> > > > dlie...@gmail.com
> > > > >> >
> > > > >> > >> wrote:
> > > > >> > >>
> > > > >> > >> > i would think reassinging keys should work in most cases.
> > > > >> > >> > The only exception is that technically Spark contracts
> imply
> > > that
> > > > >> > effect
> > > > >> > >> > should be idempotent if task is retried, which might be a
> > > problem
> > > > >> in a
> > > > >> > >> > specific scenario of the object tree coming out from block
> > > cache
> > > > >> > object
> > > > >> > >> > tree, which can stay there and be retried again. but
> > > specifically
> > > > >> > w.r.t.
> > > > >> > >> > this key assignment i don't see any problem since the
> action
> > > > >> obviously
> > > > >> > >> > would be idempotent even if this code is run multiple times
> > on
> > > > the
> > > > >> > same
> > > > >> > >> > (key, block) pair. This part should be good IMO.
> > > > >> > >> >
> > > > >> > >> > On Fri, Apr 14, 2017 at 2:26 AM, KHATWANI PARTH BHARAT <
> > > > >> > >> > h2016...@pilani.bits-pilani.ac.in> wrote:
> > > > >> > >> >
> > > > >> > >> > > @Dmitriy Sir,
> > > > >> > >> > > In the K means code above I think i am doing the
> following
> > > > >> > Incorrectly
> > > > >> > >> > >
> > > > >> > >> > > Assigning the closest centriod index to the Row Keys of
> DRM
> > > > >> > >> > >
> > > > >> > >> > > //11. Iterating over the Data Matrix(in DrmLike[Int]
> > format)
> > > to
> > > > >> > >> calculate
> > > > >> > >> > > the initial centriods
> > > > >> > >> > >     dataDrmX.mapBlock() {
> > > > >> > >> > >       case (keys, block) =>
> > > > >> > >> > >         for (row <- 0 until block.nrow) {
> > > > >> > >> > >           var dataPoint = block(row, ::)
> > > > >> > >> > >
> > > > >> > >> > >           //12. findTheClosestCentriod find the closest
> > > > centriod
> > > > >> to
> > > > >> > >> the
> > > > >> > >> > > Data point specified by "dataPoint"
> > > > >> > >> > >           val closesetIndex = findTheClosestCentriod(
> > > > dataPoint,
> > > > >> > >> > centriods)
> > > > >> > >> > >
> > > > >> > >> > >           //13. assigning closest index to key
> > > > >> > >> > >           keys(row) = closesetIndex
> > > > >> > >> > >         }
> > > > >> > >> > >         keys -> block
> > > > >> > >> > >     }
> > > > >> > >> > >
> > > > >> > >> > >  in step 12 i am finding the centriod closest to the
> > current
> > > > >> > dataPoint
> > > > >> > >> > >  in step13 i am assigning the closesetIndex to the key of
> > the
> > > > >> > >> > corresponding
> > > > >> > >> > > row represented by the dataPoint
> > > > >> > >> > > I think i am doing step13 incorrectly.
> > > > >> > >> > >
> > > > >> > >> > > Also i am unable to find the proper reference for the
> same
> > in
> > > > the
> > > > >> > >> > reference
> > > > >> > >> > > links which you have mentioned above
> > > > >> > >> > >
> > > > >> > >> > >
> > > > >> > >> > > Thanks & Regards
> > > > >> > >> > > Parth Khatwani
> > > > >> > >> > >
> > > > >> > >> > >
> > > > >> > >> > >
> > > > >> > >> > >
> > > > >> > >> > >
> > > > >> > >> > > On Thu, Apr 13, 2017 at 6:24 PM, KHATWANI PARTH BHARAT <
> > > > >> > >> > > h2016...@pilani.bits-pilani.ac.in> wrote:
> > > > >> > >> > >
> > > > >> > >> > > > Dmitriy Sir,
> > > > >> > >> > > > I have Created a github branch Github Branch Having
> > Initial
> > > > >> Kmeans
> > > > >> > >> Code
> > > > >> > >> > > > <https://github.com/parth2691/
> > > Spark_Mahout/tree/Dmitriy-Lyub
> > > > >> imov>
> > > > >> > >> > > >
> > > > >> > >> > > >
> > > > >> > >> > > > Thanks & Regards
> > > > >> > >> > > > Parth Khatwani
> > > > >> > >> > > >
> > > > >> > >> > > > On Thu, Apr 13, 2017 at 3:19 AM, Andrew Palumbo <
> > > > >> > ap....@outlook.com
> > > > >> > >> >
> > > > >> > >> > > > wrote:
> > > > >> > >> > > >
> > > > >> > >> > > >> +1 to creating a branch.
> > > > >> > >> > > >>
> > > > >> > >> > > >>
> > > > >> > >> > > >>
> > > > >> > >> > > >> Sent from my Verizon Wireless 4G LTE smartphone
> > > > >> > >> > > >>
> > > > >> > >> > > >>
> > > > >> > >> > > >> -------- Original message --------
> > > > >> > >> > > >> From: Dmitriy Lyubimov <dlie...@gmail.com>
> > > > >> > >> > > >> Date: 04/12/2017 11:25 (GMT-08:00)
> > > > >> > >> > > >> To: dev@mahout.apache.org
> > > > >> > >> > > >> Subject: Re: Trying to write the KMeans Clustering
> Using
> > > > >> "Apache
> > > > >> > >> > Mahout
> > > > >> > >> > > >> Samsara"
> > > > >> > >> > > >>
> > > > >> > >> > > >> can't say i can read this code well formatted that
> > way...
> > > > >> > >> > > >>
> > > > >> > >> > > >> it would seem to me that the code is not using the
> > > broadcast
> > > > >> > >> variable
> > > > >> > >> > > and
> > > > >> > >> > > >> instead is using closure variable. that's the only
> > thing i
> > > > can
> > > > >> > >> > > immediately
> > > > >> > >> > > >> see by looking in the middle of it.
> > > > >> > >> > > >>
> > > > >> > >> > > >> it would be better if you created a branch on github
> for
> > > > that
> > > > >> > code
> > > > >> > >> > that
> > > > >> > >> > > >> would allow for easy check-outs and comments.
> > > > >> > >> > > >>
> > > > >> > >> > > >> -d
> > > > >> > >> > > >>
> > > > >> > >> > > >> On Wed, Apr 12, 2017 at 10:29 AM, KHATWANI PARTH
> BHARAT
> > <
> > > > >> > >> > > >> h2016...@pilani.bits-pilani.ac.in> wrote:
> > > > >> > >> > > >>
> > > > >> > >> > > >> > @Dmitriy Sir
> > > > >> > >> > > >> >
> > > > >> > >> > > >> > I have completed the Kmeans code as per the
> algorithm
> > > you
> > > > >> have
> > > > >> > >> > Outline
> > > > >> > >> > > >> > above
> > > > >> > >> > > >> >
> > > > >> > >> > > >> > My code is as follows
> > > > >> > >> > > >> >
> > > > >> > >> > > >> > This code works fine till step number 10
> > > > >> > >> > > >> >
> > > > >> > >> > > >> > In step 11 i am assigning the new centriod index  to
> > > > >> > >> corresponding
> > > > >> > >> > row
> > > > >> > >> > > >> key
> > > > >> > >> > > >> > of data Point in the matrix
> > > > >> > >> > > >> > I think i am doing something wrong in step 11 may
> be i
> > > am
> > > > >> using
> > > > >> > >> > > >> incorrect
> > > > >> > >> > > >> > syntax
> > > > >> > >> > > >> >
> > > > >> > >> > > >> > Can you help me find out what am i doing wrong.
> > > > >> > >> > > >> >
> > > > >> > >> > > >> >
> > > > >> > >> > > >> > //start of main method
> > > > >> > >> > > >> >
> > > > >> > >> > > >> > def main(args: Array[String]) {
> > > > >> > >> > > >> >      //1. initialize the spark and mahout context
> > > > >> > >> > > >> >     val conf = new SparkConf()
> > > > >> > >> > > >> >       .setAppName("DRMExample")
> > > > >> > >> > > >> >       .setMaster(args(0))
> > > > >> > >> > > >> >       .set("spark.serializer",
> > > > "org.apache.spark.serializer.
> > > > >> > >> > > >> > KryoSerializer")
> > > > >> > >> > > >> >       .set("spark.kryo.registrator",
> > > > >> > >> > > >> > "org.apache.mahout.sparkbindings.io.
> > > > MahoutKryoRegistrator")
> > > > >> > >> > > >> >     implicit val sc = new
> SparkDistributedContext(new
> > > > >> > >> > > >> SparkContext(conf))
> > > > >> > >> > > >> >
> > > > >> > >> > > >> >     //2. read the data file and save it in the rdd
> > > > >> > >> > > >> >     val lines = sc.textFile(args(1))
> > > > >> > >> > > >> >
> > > > >> > >> > > >> >     //3. convert data read in as string in to array
> of
> > > > >> double
> > > > >> > >> > > >> >     val test = lines.map(line =>
> > > > >> line.split('\t').map(_.toDoubl
> > > > >> > >> e))
> > > > >> > >> > > >> >
> > > > >> > >> > > >> >     //4. add a column having value 1 in array of
> > double
> > > > this
> > > > >> > will
> > > > >> > >> > > >> > create something like (1 | D)',  which will be used
> > > while
> > > > >> > >> > calculating
> > > > >> > >> > > >> > (1 | D)'
> > > > >> > >> > > >> >     val augumentedArray = test.map(addCentriodColumn
> > _)
> > > > >> > >> > > >> >
> > > > >> > >> > > >> >     //5. convert rdd of array of double in rdd of
> > > > >> DenseVector
> > > > >> > >> > > >> >     val rdd = augumentedArray.map(dvec(_))
> > > > >> > >> > > >> >
> > > > >> > >> > > >> >     //6. convert rdd to DrmRdd
> > > > >> > >> > > >> >     val rddMatrixLike: DrmRdd[Int] =
> > > rdd.zipWithIndex.map
> > > > {
> > > > >> > case
> > > > >> > >> (v,
> > > > >> > >> > > >> > idx) => (idx.toInt, v) }        //7. convert DrmRdd
> to
> > > > >> > >> > > >> > CheckpointedDrm[Int]    val matrix =
> > > > drmWrap(rddMatrixLike)
> > > > >> > >> //8.
> > > > >> > >> > > >> > seperating the column having all ones created in
> step
> > 4
> > > > and
> > > > >> > will
> > > > >> > >> use
> > > > >> > >> > > >> > it later    val oneVector = matrix(::, 0 until 1)
> > > > >> //9.
> > > > >> > >> final
> > > > >> > >> > > >> > input data in DrmLike[Int] format    val dataDrmX =
> > > > >> matrix(::,
> > > > >> > 1
> > > > >> > >> > until
> > > > >> > >> > > >> > 4)            //9. Sampling to select initial
> > centriods
> > > > >> val
> > > > >> > >> > > >> > centriods = drmSampleKRows(dataDrmX, 2, false)
> > > > >> > centriods.size
> > > > >> > >> > > >> > //10. Broad Casting the initial centriods    val
> > > > >> > broadCastMatrix
> > > > >> > >> =
> > > > >> > >> > > >> > drmBroadcast(centriods)            //11. Iterating
> > over
> > > > the
> > > > >> > Data
> > > > >> > >> > > >> > Matrix(in DrmLike[Int] format) to calculate the
> > initial
> > > > >> > centriods
> > > > >> > >> > > >> > dataDrmX.mapBlock() {      case (keys, block) =>
> > > > for
> > > > >> > (row
> > > > >> > >> <-
> > > > >> > >> > 0
> > > > >> > >> > > >> > until block.nrow) {          var dataPoint =
> > block(row,
> > > > ::)
> > > > >> > >> > > >> >         //12. findTheClosestCentriod find the
> closest
> > > > >> centriod
> > > > >> > to
> > > > >> > >> > the
> > > > >> > >> > > >> > Data point specified by "dataPoint"          val
> > > > >> closesetIndex
> > > > >> > =
> > > > >> > >> > > >> > findTheClosestCentriod(dataPoint, centriods)
> > > > >> > >> > //13.
> > > > >> > >> > > >> > assigning closest index to key          keys(row) =
> > > > >> > closesetIndex
> > > > >> > >> > > >> >   }        keys -> block    }
> > > > >> > >> > > >> >
> > > > >> > >> > > >> >     //14. Calculating the (1|D)      val b =
> > (oneVector
> > > > >> cbind
> > > > >> > >> > > >> > dataDrmX)        //15. Aggregating Transpose (1|D)'
> > > val
> > > > >> > >> > bTranspose
> > > > >> > >> > > >> > = (oneVector cbind dataDrmX).t    // after step 15
> > > > >> bTranspose
> > > > >> > >> will
> > > > >> > >> > > >> > have data in the following format        /*(n+1)*K
> > where
> > > > >> > >> n=dimension
> > > > >> > >> > > >> > of the data point, K=number of clusters    * zeroth
> > row
> > > > will
> > > > >> > >> contain
> > > > >> > >> > > >> > the count of points assigned to each cluster    *
> > > assuming
> > > > >> 3d
> > > > >> > >> data
> > > > >> > >> > > >> > points     *     */
> > > > >> > >> > > >> >
> > > > >> > >> > > >> >
> > > > >> > >> > > >> >     val nrows = b.nrow.toInt    //16. slicing the
> > count
> > > > >> vectors
> > > > >> > >> out
> > > > >> > >> > > >> >  val pointCountVectors = drmBroadcast(b(0 until 1,
> > > > >> > ::).collect(0,
> > > > >> > >> > ::))
> > > > >> > >> > > >> >    val vectorSums = b(1 until nrows, ::)    //17.
> > > dividing
> > > > >> the
> > > > >> > >> data
> > > > >> > >> > > >> > point by count vector    vectorSums.mapBlock() {
> > > case
> > > > >> > (keys,
> > > > >> > >> > > >> > block) =>        for (row <- 0 until block.nrow) {
> > > > >> > >> > block(row,
> > > > >> > >> > > >> > ::) /= pointCountVectors        }        keys ->
> block
> > > > }
> > > > >> > >> //18.
> > > > >> > >> > > >> > seperating the count vectors    val newCentriods =
> > > > >> > >> vectorSums.t(::,1
> > > > >> > >> > > >> > until centriods.size)            //19. iterate over
> > the
> > > > >> above
> > > > >> > >> code
> > > > >> > >> > > >> > till convergence criteria is meet   }//end of main
> > > method
> > > > >> > >> > > >> >
> > > > >> > >> > > >> >
> > > > >> > >> > > >> >
> > > > >> > >> > > >> >   // method to find the closest centriod to data
> > point(
> > > > vec:
> > > > >> > >> Vector
> > > > >> > >> > > >> > in the arguments)  def findTheClosestCentriod(vec:
> > > Vector,
> > > > >> > >> matrix:
> > > > >> > >> > > >> > Matrix): Int = {
> > > > >> > >> > > >> >     var index = 0
> > > > >> > >> > > >> >     var closest = Double.PositiveInfinity
> > > > >> > >> > > >> >     for (row <- 0 until matrix.nrow) {
> > > > >> > >> > > >> >       val squaredSum = ssr(vec, matrix(row, ::))
> > > > >> > >> > > >> >       val tempDist = Math.sqrt(ssr(vec, matrix(row,
> > > ::)))
> > > > >> > >> > > >> >       if (tempDist < closest) {
> > > > >> > >> > > >> >         closest = tempDist
> > > > >> > >> > > >> >         index = row
> > > > >> > >> > > >> >       }
> > > > >> > >> > > >> >     }
> > > > >> > >> > > >> >     index
> > > > >> > >> > > >> >   }
> > > > >> > >> > > >> >
> > > > >> > >> > > >> >    //calculating the sum of squared distance between
> > the
> > > > >> > >> > > points(Vectors)
> > > > >> > >> > > >> >   def ssr(a: Vector, b: Vector): Double = {
> > > > >> > >> > > >> >     (a - b) ^= 2 sum
> > > > >> > >> > > >> >   }
> > > > >> > >> > > >> >
> > > > >> > >> > > >> >   //method used to create (1|D)
> > > > >> > >> > > >> >   def addCentriodColumn(arg: Array[Double]):
> > > Array[Double]
> > > > >> = {
> > > > >> > >> > > >> >     val newArr = new Array[Double](arg.length + 1)
> > > > >> > >> > > >> >     newArr(0) = 1.0;
> > > > >> > >> > > >> >     for (i <- 0 until (arg.size)) {
> > > > >> > >> > > >> >       newArr(i + 1) = arg(i);
> > > > >> > >> > > >> >     }
> > > > >> > >> > > >> >     newArr
> > > > >> > >> > > >> >   }
> > > > >> > >> > > >> >
> > > > >> > >> > > >> >
> > > > >> > >> > > >> > Thanks & Regards
> > > > >> > >> > > >> > Parth Khatwani
> > > > >> > >> > > >> >
> > > > >> > >> > > >> >
> > > > >> > >> > > >> >
> > > > >> > >> > > >> > On Mon, Apr 3, 2017 at 7:37 PM, KHATWANI PARTH
> BHARAT
> > <
> > > > >> > >> > > >> > h2016...@pilani.bits-pilani.ac.in> wrote:
> > > > >> > >> > > >> >
> > > > >> > >> > > >> > >
> > > > >> > >> > > >> > > ---------- Forwarded message ----------
> > > > >> > >> > > >> > > From: Dmitriy Lyubimov <dlie...@gmail.com>
> > > > >> > >> > > >> > > Date: Fri, Mar 31, 2017 at 11:34 PM
> > > > >> > >> > > >> > > Subject: Re: Trying to write the KMeans Clustering
> > > Using
> > > > >> > >> "Apache
> > > > >> > >> > > >> Mahout
> > > > >> > >> > > >> > > Samsara"
> > > > >> > >> > > >> > > To: "dev@mahout.apache.org" <
> dev@mahout.apache.org>
> > > > >> > >> > > >> > >
> > > > >> > >> > > >> > >
> > > > >> > >> > > >> > > ps1 this assumes row-wise construction of A based
> on
> > > > >> training
> > > > >> > >> set
> > > > >> > >> > > of m
> > > > >> > >> > > >> > > n-dimensional points.
> > > > >> > >> > > >> > > ps2 since we are doing multiple passes over A it
> may
> > > > make
> > > > >> > >> sense to
> > > > >> > >> > > >> make
> > > > >> > >> > > >> > > sure it is committed to spark cache (by using
> > > checkpoint
> > > > >> > api),
> > > > >> > >> if
> > > > >> > >> > > >> spark
> > > > >> > >> > > >> > is
> > > > >> > >> > > >> > > used
> > > > >> > >> > > >> > >
> > > > >> > >> > > >> > > On Fri, Mar 31, 2017 at 10:53 AM, Dmitriy
> Lyubimov <
> > > > >> > >> > > dlie...@gmail.com
> > > > >> > >> > > >> >
> > > > >> > >> > > >> > > wrote:
> > > > >> > >> > > >> > >
> > > > >> > >> > > >> > > > here is the outline. For details of APIs, please
> > > refer
> > > > >> to
> > > > >> > >> > samsara
> > > > >> > >> > > >> > manual
> > > > >> > >> > > >> > > > [2], i will not be be repeating it.
> > > > >> > >> > > >> > > >
> > > > >> > >> > > >> > > > Assume your training data input is m x n matrix
> A.
> > > For
> > > > >> > >> > simplicity
> > > > >> > >> > > >> let's
> > > > >> > >> > > >> > > > assume it's a DRM with int row keys, i.e.,
> > > > DrmLike[Int].
> > > > >> > >> > > >> > > >
> > > > >> > >> > > >> > > > Initialization:
> > > > >> > >> > > >> > > >
> > > > >> > >> > > >> > > > First, classic k-means starts by selecting
> initial
> > > > >> > clusters,
> > > > >> > >> by
> > > > >> > >> > > >> > sampling
> > > > >> > >> > > >> > > > them out. You can do that by using sampling api
> > [1],
> > > > >> thus
> > > > >> > >> > forming
> > > > >> > >> > > a
> > > > >> > >> > > >> k
> > > > >> > >> > > >> > x n
> > > > >> > >> > > >> > > > in-memory matrix C (current centroids). C is
> > > therefore
> > > > >> of
> > > > >> > >> > Mahout's
> > > > >> > >> > > >> > Matrix
> > > > >> > >> > > >> > > > type.
> > > > >> > >> > > >> > > >
> > > > >> > >> > > >> > > > You the proceed by alternating between cluster
> > > > >> assignments
> > > > >> > >> and
> > > > >> > >> > > >> > > > recompupting centroid matrix C till convergence
> > > based
> > > > on
> > > > >> > some
> > > > >> > >> > test
> > > > >> > >> > > >> or
> > > > >> > >> > > >> > > > simply limited by epoch count budget, your
> choice.
> > > > >> > >> > > >> > > >
> > > > >> > >> > > >> > > > Cluster assignments: here, we go over current
> > > > generation
> > > > >> > of A
> > > > >> > >> > and
> > > > >> > >> > > >> > > > recompute centroid indexes for each row in A.
> Once
> > > we
> > > > >> > >> recompute
> > > > >> > >> > > >> index,
> > > > >> > >> > > >> > we
> > > > >> > >> > > >> > > > put it into the row key . You can do that by
> > > assigning
> > > > >> > >> centroid
> > > > >> > >> > > >> indices
> > > > >> > >> > > >> > > to
> > > > >> > >> > > >> > > > keys of A using operator mapblock() (details in
> > [2],
> > > > >> [3],
> > > > >> > >> [4]).
> > > > >> > >> > > You
> > > > >> > >> > > >> > also
> > > > >> > >> > > >> > > > need to broadcast C in order to be able to
> access
> > it
> > > > in
> > > > >> > >> > efficient
> > > > >> > >> > > >> > manner
> > > > >> > >> > > >> > > > inside mapblock() closure. Examples of that are
> > > plenty
> > > > >> > given
> > > > >> > >> in
> > > > >> > >> > > [2].
> > > > >> > >> > > >> > > > Essentially, in mapblock, you'd reform the row
> > keys
> > > to
> > > > >> > >> reflect
> > > > >> > >> > > >> cluster
> > > > >> > >> > > >> > > > index in C. while going over A, you'd have a
> > > "nearest
> > > > >> > >> neighbor"
> > > > >> > >> > > >> problem
> > > > >> > >> > > >> > > to
> > > > >> > >> > > >> > > > solve for the row of A and centroids C. This is
> > the
> > > > >> bulk of
> > > > >> > >> > > >> computation
> > > > >> > >> > > >> > > > really, and there are a few tricks there that
> can
> > > > speed
> > > > >> > this
> > > > >> > >> > step
> > > > >> > >> > > >> up in
> > > > >> > >> > > >> > > > both exact and approximate manner, but you can
> > start
> > > > >> with a
> > > > >> > >> > naive
> > > > >> > >> > > >> > search.
> > > > >> > >> > > >> > > >
> > > > >> > >> > > >> > > > Centroid recomputation:
> > > > >> > >> > > >> > > > once you assigned centroids to the keys of marix
> > A,
> > > > >> you'd
> > > > >> > >> want
> > > > >> > >> > to
> > > > >> > >> > > >> do an
> > > > >> > >> > > >> > > > aggregating transpose of A to compute
> essentially
> > > > >> average
> > > > >> > of
> > > > >> > >> > row A
> > > > >> > >> > > >> > > grouped
> > > > >> > >> > > >> > > > by the centroid key. The trick is to do a
> > > computation
> > > > of
> > > > >> > >> (1|A)'
> > > > >> > >> > > >> which
> > > > >> > >> > > >> > > will
> > > > >> > >> > > >> > > > results in a matrix of the shape (Counts/sums of
> > > > cluster
> > > > >> > >> rows).
> > > > >> > >> > > >> This is
> > > > >> > >> > > >> > > the
> > > > >> > >> > > >> > > > part i find difficult to explain without a latex
> > > > >> graphics.
> > > > >> > >> > > >> > > >
> > > > >> > >> > > >> > > > In Samsara, construction of (1|A)' corresponds
> to
> > > DRM
> > > > >> > >> expression
> > > > >> > >> > > >> > > >
> > > > >> > >> > > >> > > > (1 cbind A).t (again, see [2]).
> > > > >> > >> > > >> > > >
> > > > >> > >> > > >> > > > So when you compute, say,
> > > > >> > >> > > >> > > >
> > > > >> > >> > > >> > > > B = (1 | A)',
> > > > >> > >> > > >> > > >
> > > > >> > >> > > >> > > > then B is (n+1) x k, so each column contains a
> > > vector
> > > > >> > >> > > corresponding
> > > > >> > >> > > >> to
> > > > >> > >> > > >> > a
> > > > >> > >> > > >> > > > cluster 1..k. In such column, the first element
> > > would
> > > > >> be #
> > > > >> > of
> > > > >> > >> > > >> points in
> > > > >> > >> > > >> > > the
> > > > >> > >> > > >> > > > cluster, and the rest of it would correspond to
> > sum
> > > of
> > > > >> all
> > > > >> > >> > points.
> > > > >> > >> > > >> So
> > > > >> > >> > > >> > in
> > > > >> > >> > > >> > > > order to arrive to an updated matrix C, we need
> to
> > > > >> collect
> > > > >> > B
> > > > >> > >> > into
> > > > >> > >> > > >> > memory,
> > > > >> > >> > > >> > > > and slice out counters (first row) from the rest
> > of
> > > > it.
> > > > >> > >> > > >> > > >
> > > > >> > >> > > >> > > > So, to compute C:
> > > > >> > >> > > >> > > >
> > > > >> > >> > > >> > > > C <- B (2:,:) each row divided by B(1,:)
> > > > >> > >> > > >> > > >
> > > > >> > >> > > >> > > > (watch out for empty clusters with 0 elements,
> > this
> > > > will
> > > > >> > >> cause
> > > > >> > >> > > lack
> > > > >> > >> > > >> of
> > > > >> > >> > > >> > > > convergence and NaNs in the newly computed C).
> > > > >> > >> > > >> > > >
> > > > >> > >> > > >> > > > This operation obviously uses subblocking and
> > > row-wise
> > > > >> > >> iteration
> > > > >> > >> > > >> over
> > > > >> > >> > > >> > B,
> > > > >> > >> > > >> > > > for which i am again making reference to [2].
> > > > >> > >> > > >> > > >
> > > > >> > >> > > >> > > >
> > > > >> > >> > > >> > > > [1] https://github.com/apache/
> > > > >> > mahout/blob/master/math-scala/
> > > > >> > >> > > >> > > > src/main/scala/org/apache/maho
> > > > >> ut/math/drm/package.scala#
> > > > >> > L149
> > > > >> > >> > > >> > > >
> > > > >> > >> > > >> > > > [2], Sasmara manual, a bit dated but viable,
> > > > >> > >> > http://apache.github
> > > > >> > >> > > .
> > > > >> > >> > > >> > > > io/mahout/doc/ScalaSparkBindings.html
> > > > >> > >> > > >> > > >
> > > > >> > >> > > >> > > > [3] scaladoc, again, dated but largely viable
> for
> > > the
> > > > >> > >> purpose of
> > > > >> > >> > > >> this
> > > > >> > >> > > >> > > > exercise:
> > > > >> > >> > > >> > > > http://apache.github.io/
> > mahout/0.10.1/docs/mahout-
> > > > math-
> > > > >> > >> > > >> scala/index.htm
> > > > >> > >> > > >> > > >
> > > > >> > >> > > >> > > > [4] mapblock etc.
> http://apache.github.io/mahout
> > > > >> > >> > > >> /0.10.1/docs/mahout-
> > > > >> > >> > > >> > > > math-scala/index.html#org.
> apache.mahout.math.drm.
> > > > >> > RLikeDrmOps
> > > > >> > >> > > >> > > >
> > > > >> > >> > > >> > > > On Fri, Mar 31, 2017 at 9:54 AM, KHATWANI PARTH
> > > > BHARAT <
> > > > >> > >> > > >> > > > h2016...@pilani.bits-pilani.ac.in> wrote:
> > > > >> > >> > > >> > > >
> > > > >> > >> > > >> > > >> @Dmitriycan you please again tell me the
> approach
> > > to
> > > > >> move
> > > > >> > >> > ahead.
> > > > >> > >> > > >> > > >>
> > > > >> > >> > > >> > > >>
> > > > >> > >> > > >> > > >> Thanks
> > > > >> > >> > > >> > > >> Parth Khatwani
> > > > >> > >> > > >> > > >>
> > > > >> > >> > > >> > > >>
> > > > >> > >> > > >> > > >> On Fri, Mar 31, 2017 at 10:15 PM, KHATWANI
> PARTH
> > > > >> BHARAT <
> > > > >> > >> > > >> > > >> h2016...@pilani.bits-pilani.ac.in> wrote:
> > > > >> > >> > > >> > > >>
> > > > >> > >> > > >> > > >> > yes i am unable to figure out the way ahead.
> > > > >> > >> > > >> > > >> > Like how to create the augmented matrix A :=
> > > (0|D)
> > > > >> which
> > > > >> > >> you
> > > > >> > >> > > have
> > > > >> > >> > > >> > > >> > mentioned.
> > > > >> > >> > > >> > > >> >
> > > > >> > >> > > >> > > >> >
> > > > >> > >> > > >> > > >> > On Fri, Mar 31, 2017 at 10:10 PM, Dmitriy
> > > Lyubimov
> > > > <
> > > > >> > >> > > >> > dlie...@gmail.com
> > > > >> > >> > > >> > > >
> > > > >> > >> > > >> > > >> > wrote:
> > > > >> > >> > > >> > > >> >
> > > > >> > >> > > >> > > >> >> was my reply for your post on @user has
> been a
> > > bit
> > > > >> > >> > confusing?
> > > > >> > >> > > >> > > >> >>
> > > > >> > >> > > >> > > >> >> On Fri, Mar 31, 2017 at 8:40 AM, KHATWANI
> > PARTH
> > > > >> BHARAT
> > > > >> > <
> > > > >> > >> > > >> > > >> >> h2016...@pilani.bits-pilani.ac.in> wrote:
> > > > >> > >> > > >> > > >> >>
> > > > >> > >> > > >> > > >> >> > Sir,
> > > > >> > >> > > >> > > >> >> > I am trying to write the kmeans clustering
> > > > >> algorithm
> > > > >> > >> using
> > > > >> > >> > > >> Mahout
> > > > >> > >> > > >> > > >> >> Samsara
> > > > >> > >> > > >> > > >> >> > but i am bit confused
> > > > >> > >> > > >> > > >> >> > about how to leverage Distributed Row
> Matrix
> > > for
> > > > >> the
> > > > >> > >> same.
> > > > >> > >> > > Can
> > > > >> > >> > > >> > > >> anybody
> > > > >> > >> > > >> > > >> >> help
> > > > >> > >> > > >> > > >> >> > me with same.
> > > > >> > >> > > >> > > >> >> >
> > > > >> > >> > > >> > > >> >> >
> > > > >> > >> > > >> > > >> >> >
> > > > >> > >> > > >> > > >> >> >
> > > > >> > >> > > >> > > >> >> >
> > > > >> > >> > > >> > > >> >> > Thanks
> > > > >> > >> > > >> > > >> >> > Parth Khatwani
> > > > >> > >> > > >> > > >> >> >
> > > > >> > >> > > >> > > >> >>
> > > > >> > >> > > >> > > >> >
> > > > >> > >> > > >> > > >> >
> > > > >> > >> > > >> > > >>
> > > > >> > >> > > >> > > >
> > > > >> > >> > > >> > > >
> > > > >> > >> > > >> > >
> > > > >> > >> > > >> > >
> > > > >> > >> > > >> >
> > > > >> > >> > > >>
> > > > >> > >> > > >
> > > > >> > >> > > >
> > > > >> > >> > >
> > > > >> > >> >
> > > > >> > >>
> > > > >> > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to