The problem is that groupByKey means “bring all the points with this same key 
to the same JVM”. Your input is a Seq[Point], so you have to have all the 
points there. This means that a) all points will be sent across the network in 
a cluster, which is slow (and Spark goes through this sending code path even in 
local mode so it serializes the data), and b) you’ll get out of memory errors 
if that Seq is too big. In large-scale data processing, data movement is often 
the biggest cost, so you have to carefully choose which operations to use.

Matei

On Apr 19, 2014, at 4:04 AM, ticup <tim.coppiete...@gmail.com> wrote:

> Hi,
> 
> I was playing around with other k-means implementations in Scala/Spark in
> order to test performances and get a better grasp on Spark.
> 
> Now, I made one similar to the one from the examples
> (https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala),
> except that it's a bit less clever. Nevertheless, I expect a non-expert
> scala/spark programmer to write similar code instead of that from the
> example.
> 
> Here is how they compare: in the step of calculating the new centroids (this
> is done by taking the average of all points belonging to the current
> centroids - the main workhorse of the algo), where the *example algorithm*
> adds the points of the same cluster and keeps track of the number of points
> in each cluster in 1 step (by using reduceByKey and keeping a counter in the
> reduce value):
> 
> val closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
> 
> val pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) => (x1 + x2,
> y1 + y2)}
> 
> and then proceeds by dividing the sum of all points of a cluster by the
> counted number of points in the cluster:
> 
> val newPoints = pointStats.map {pair => (pair._1, pair._2._1 /
> pair._2._2)}.collectAsMap()
> 
> Afterwards the change of the new centroids is calculated in order to know
> when to stop iterating:
> 
> tempDist = 0.0
> 
> for (i <- 0 until K) {
>     tempDist += kPoints(i).squaredDist(newPoints(i))
> }
> 
> 
> 
> *my algorithm *
> (https://github.com/ticup/k-means-spark/blob/master/src/main/scala/k-means.scala)
> is less clever, but more straightforward: it just groups all the points of
> each cluster and then proceeds to calculate the average on those points and
> adds the difference with the previous centroid to an accumulator:
> 
> // accumulator for differences new centroids
> dist = sc.accumulator(0.0)
> 
> // calculate new centroids + add difference to old centroids
> centroids = closest.groupByKey().map{case(i, points) =>
>    val newCentroid = average(points)
>    dist += centroids(i).squaredDist(newCentroid)
>    newCentroid
> }.collect()
> 
> with:
> 
> def average(points: Seq[Vector]) : Vector = {
>    points.reduce(_+_) / points.length
> }
> 
> So, the big differences are:
> 
> 1. Use of accumulator
> 2. Do excessive work by not cleverly calculating the average
> 3. Accesses the centroids val from within the map
> 
> 
> Now, why I'm here for, this version runs EXTREMELY slow and gets
> outOfHeapMemory exceptions for data input that the original algorithm easily
> solves in ~5seconds. I'm trying to pinpoint what exactly is causing this
> huge difference. The use of an accumulator shouldn't really affect the
> performance and it doesn't, because I tried it without the accumulator and
> it stays as slow. Further, I expect the excessive work to slow down the
> algorithm with a factor of 2 or something, but this is really a decrease in
> factors of 10 or more.
> 
> Even with 1 worker and 1 core (thus no parallelism) the difference in speed
> stays the same, so it's not because the averaging is not parallelised
> properly, there must be something going on that is much more important.
> 
> Could someone give me pointers on what exactly is happening here? It can't
> be because I'm just accessing the centroids value from within the closure?
> 
> Speed comparison:
> 
> The *slow algorithm*: 44 seconds to perform the map
> 14/04/19 13:03:15 INFO scheduler.DAGScheduler: Stage 3 (map at
> k-means.scala:114) finished in 43.909 s
> 
> 
> The *fast algorithm*: more or less the same operations (in 2 steps instead
> of 1) in 2.2 seconds
> 
> 14/04/19 12:52:29 INFO scheduler.DAGScheduler: Stage 3 (reduceByKey at
> k-means.scala:84) finished in 2.090 s
> ....
> 14/04/19 12:52:29 INFO scheduler.DAGScheduler: Stage 2 (collectAsMap at
> k-means.scala:86) finished in 0.117 s
> 
> 
> Thanks in advance,
> Tim.
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/extremely-slow-k-means-version-tp4489.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to