can't say i can read this code well formatted that way...

it would seem to me that the code is not using the broadcast variable and
instead is using closure variable. that's the only thing i can immediately
see by looking in the middle of it.

it would be better if you created a branch on github for that code that
would allow for easy check-outs and comments.

-d

On Wed, Apr 12, 2017 at 10:29 AM, KHATWANI PARTH BHARAT <
h2016...@pilani.bits-pilani.ac.in> wrote:

> @Dmitriy Sir
>
> I have completed the Kmeans code as per the algorithm you have Outline
> above
>
> My code is as follows
>
> This code works fine till step number 10
>
> In step 11 i am assigning the new centriod index  to corresponding row key
> of data Point in the matrix
> I think i am doing something wrong in step 11 may be i am using incorrect
> syntax
>
> Can you help me find out what am i doing wrong.
>
>
> //start of main method
>
> def main(args: Array[String]) {
>      //1. initialize the spark and mahout context
>     val conf = new SparkConf()
>       .setAppName("DRMExample")
>       .setMaster(args(0))
>       .set("spark.serializer", "org.apache.spark.serializer.
> KryoSerializer")
>       .set("spark.kryo.registrator",
> "org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator")
>     implicit val sc = new SparkDistributedContext(new SparkContext(conf))
>
>     //2. read the data file and save it in the rdd
>     val lines = sc.textFile(args(1))
>
>     //3. convert data read in as string in to array of double
>     val test = lines.map(line => line.split('\t').map(_.toDouble))
>
>     //4. add a column having value 1 in array of double this will
> create something like (1 | D)',  which will be used while calculating
> (1 | D)'
>     val augumentedArray = test.map(addCentriodColumn _)
>
>     //5. convert rdd of array of double in rdd of DenseVector
>     val rdd = augumentedArray.map(dvec(_))
>
>     //6. convert rdd to DrmRdd
>     val rddMatrixLike: DrmRdd[Int] = rdd.zipWithIndex.map { case (v,
> idx) => (idx.toInt, v) }        //7. convert DrmRdd to
> CheckpointedDrm[Int]    val matrix = drmWrap(rddMatrixLike)    //8.
> seperating the column having all ones created in step 4 and will use
> it later    val oneVector = matrix(::, 0 until 1)        //9. final
> input data in DrmLike[Int] format    val dataDrmX = matrix(::, 1 until
> 4)            //9. Sampling to select initial centriods    val
> centriods = drmSampleKRows(dataDrmX, 2, false)    centriods.size
> //10. Broad Casting the initial centriods    val broadCastMatrix =
> drmBroadcast(centriods)            //11. Iterating over the Data
> Matrix(in DrmLike[Int] format) to calculate the initial centriods
> dataDrmX.mapBlock() {      case (keys, block) =>        for (row <- 0
> until block.nrow) {          var dataPoint = block(row, ::)
>         //12. findTheClosestCentriod find the closest centriod to the
> Data point specified by "dataPoint"          val closesetIndex =
> findTheClosestCentriod(dataPoint, centriods)                    //13.
> assigning closest index to key          keys(row) = closesetIndex
>   }        keys -> block    }
>
>     //14. Calculating the (1|D)      val b = (oneVector cbind
> dataDrmX)        //15. Aggregating Transpose (1|D)'    val bTranspose
> = (oneVector cbind dataDrmX).t    // after step 15 bTranspose will
> have data in the following format        /*(n+1)*K where n=dimension
> of the data point, K=number of clusters    * zeroth row will contain
> the count of points assigned to each cluster    * assuming 3d data
> points     *     */
>
>
>     val nrows = b.nrow.toInt    //16. slicing the count vectors out
>  val pointCountVectors = drmBroadcast(b(0 until 1, ::).collect(0, ::))
>    val vectorSums = b(1 until nrows, ::)    //17. dividing the data
> point by count vector    vectorSums.mapBlock() {      case (keys,
> block) =>        for (row <- 0 until block.nrow) {          block(row,
> ::) /= pointCountVectors        }        keys -> block    }    //18.
> seperating the count vectors    val newCentriods = vectorSums.t(::,1
> until centriods.size)            //19. iterate over the above code
> till convergence criteria is meet   }//end of main method
>
>
>
>   // method to find the closest centriod to data point( vec: Vector
> in the arguments)  def findTheClosestCentriod(vec: Vector, matrix:
> Matrix): Int = {
>     var index = 0
>     var closest = Double.PositiveInfinity
>     for (row <- 0 until matrix.nrow) {
>       val squaredSum = ssr(vec, matrix(row, ::))
>       val tempDist = Math.sqrt(ssr(vec, matrix(row, ::)))
>       if (tempDist < closest) {
>         closest = tempDist
>         index = row
>       }
>     }
>     index
>   }
>
>    //calculating the sum of squared distance between the points(Vectors)
>   def ssr(a: Vector, b: Vector): Double = {
>     (a - b) ^= 2 sum
>   }
>
>   //method used to create (1|D)
>   def addCentriodColumn(arg: Array[Double]): Array[Double] = {
>     val newArr = new Array[Double](arg.length + 1)
>     newArr(0) = 1.0;
>     for (i <- 0 until (arg.size)) {
>       newArr(i + 1) = arg(i);
>     }
>     newArr
>   }
>
>
> Thanks & Regards
> Parth Khatwani
>
>
>
> On Mon, Apr 3, 2017 at 7:37 PM, KHATWANI PARTH BHARAT <
> h2016...@pilani.bits-pilani.ac.in> wrote:
>
> >
> > ---------- Forwarded message ----------
> > From: Dmitriy Lyubimov <dlie...@gmail.com>
> > Date: Fri, Mar 31, 2017 at 11:34 PM
> > Subject: Re: Trying to write the KMeans Clustering Using "Apache Mahout
> > Samsara"
> > To: "dev@mahout.apache.org" <dev@mahout.apache.org>
> >
> >
> > ps1 this assumes row-wise construction of A based on training set of m
> > n-dimensional points.
> > ps2 since we are doing multiple passes over A it may make sense to make
> > sure it is committed to spark cache (by using checkpoint api), if spark
> is
> > used
> >
> > On Fri, Mar 31, 2017 at 10:53 AM, Dmitriy Lyubimov <dlie...@gmail.com>
> > wrote:
> >
> > > here is the outline. For details of APIs, please refer to samsara
> manual
> > > [2], i will not be be repeating it.
> > >
> > > Assume your training data input is m x n matrix A. For simplicity let's
> > > assume it's a DRM with int row keys, i.e., DrmLike[Int].
> > >
> > > Initialization:
> > >
> > > First, classic k-means starts by selecting initial clusters, by
> sampling
> > > them out. You can do that by using sampling api [1], thus forming a k
> x n
> > > in-memory matrix C (current centroids). C is therefore of Mahout's
> Matrix
> > > type.
> > >
> > > You the proceed by alternating between cluster assignments and
> > > recompupting centroid matrix C till convergence based on some test or
> > > simply limited by epoch count budget, your choice.
> > >
> > > Cluster assignments: here, we go over current generation of A and
> > > recompute centroid indexes for each row in A. Once we recompute index,
> we
> > > put it into the row key . You can do that by assigning centroid indices
> > to
> > > keys of A using operator mapblock() (details in [2], [3], [4]). You
> also
> > > need to broadcast C in order to be able to access it in efficient
> manner
> > > inside mapblock() closure. Examples of that are plenty given in [2].
> > > Essentially, in mapblock, you'd reform the row keys to reflect cluster
> > > index in C. while going over A, you'd have a "nearest neighbor" problem
> > to
> > > solve for the row of A and centroids C. This is the bulk of computation
> > > really, and there are a few tricks there that can speed this step up in
> > > both exact and approximate manner, but you can start with a naive
> search.
> > >
> > > Centroid recomputation:
> > > once you assigned centroids to the keys of marix A, you'd want to do an
> > > aggregating transpose of A to compute essentially average of row A
> > grouped
> > > by the centroid key. The trick is to do a computation of (1|A)' which
> > will
> > > results in a matrix of the shape (Counts/sums of cluster rows). This is
> > the
> > > part i find difficult to explain without a latex graphics.
> > >
> > > In Samsara, construction of (1|A)' corresponds to DRM expression
> > >
> > > (1 cbind A).t (again, see [2]).
> > >
> > > So when you compute, say,
> > >
> > > B = (1 | A)',
> > >
> > > then B is (n+1) x k, so each column contains a vector corresponding to
> a
> > > cluster 1..k. In such column, the first element would be # of points in
> > the
> > > cluster, and the rest of it would correspond to sum of all points. So
> in
> > > order to arrive to an updated matrix C, we need to collect B into
> memory,
> > > and slice out counters (first row) from the rest of it.
> > >
> > > So, to compute C:
> > >
> > > C <- B (2:,:) each row divided by B(1,:)
> > >
> > > (watch out for empty clusters with 0 elements, this will cause lack of
> > > convergence and NaNs in the newly computed C).
> > >
> > > This operation obviously uses subblocking and row-wise iteration over
> B,
> > > for which i am again making reference to [2].
> > >
> > >
> > > [1] https://github.com/apache/mahout/blob/master/math-scala/
> > > src/main/scala/org/apache/mahout/math/drm/package.scala#L149
> > >
> > > [2], Sasmara manual, a bit dated but viable, http://apache.github.
> > > io/mahout/doc/ScalaSparkBindings.html
> > >
> > > [3] scaladoc, again, dated but largely viable for the purpose of this
> > > exercise:
> > > http://apache.github.io/mahout/0.10.1/docs/mahout-math-scala/index.htm
> > >
> > > [4] mapblock etc. http://apache.github.io/mahout/0.10.1/docs/mahout-
> > > math-scala/index.html#org.apache.mahout.math.drm.RLikeDrmOps
> > >
> > > On Fri, Mar 31, 2017 at 9:54 AM, KHATWANI PARTH BHARAT <
> > > h2016...@pilani.bits-pilani.ac.in> wrote:
> > >
> > >> @Dmitriycan you please again tell me the approach to move ahead.
> > >>
> > >>
> > >> Thanks
> > >> Parth Khatwani
> > >>
> > >>
> > >> On Fri, Mar 31, 2017 at 10:15 PM, KHATWANI PARTH BHARAT <
> > >> h2016...@pilani.bits-pilani.ac.in> wrote:
> > >>
> > >> > yes i am unable to figure out the way ahead.
> > >> > Like how to create the augmented matrix A := (0|D) which you have
> > >> > mentioned.
> > >> >
> > >> >
> > >> > On Fri, Mar 31, 2017 at 10:10 PM, Dmitriy Lyubimov <
> dlie...@gmail.com
> > >
> > >> > wrote:
> > >> >
> > >> >> was my reply for your post on @user has been a bit confusing?
> > >> >>
> > >> >> On Fri, Mar 31, 2017 at 8:40 AM, KHATWANI PARTH BHARAT <
> > >> >> h2016...@pilani.bits-pilani.ac.in> wrote:
> > >> >>
> > >> >> > Sir,
> > >> >> > I am trying to write the kmeans clustering algorithm using Mahout
> > >> >> Samsara
> > >> >> > but i am bit confused
> > >> >> > about how to leverage Distributed Row Matrix for the same. Can
> > >> anybody
> > >> >> help
> > >> >> > me with same.
> > >> >> >
> > >> >> >
> > >> >> >
> > >> >> >
> > >> >> >
> > >> >> > Thanks
> > >> >> > Parth Khatwani
> > >> >> >
> > >> >>
> > >> >
> > >> >
> > >>
> > >
> > >
> >
> >
>

Reply via email to