extremely slow k-means version
Hi, I was playing around with other k-means implementations in Scala/Spark in order to test performances and get a better grasp on Spark. Now, I made one similar to the one from the examples (https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala), except that it's a bit less clever. Nevertheless, I expect a non-expert scala/spark programmer to write similar code instead of that from the example. Here is how they compare: in the step of calculating the new centroids (this is done by taking the average of all points belonging to the current centroids - the main workhorse of the algo), where the *example algorithm* adds the points of the same cluster and keeps track of the number of points in each cluster in 1 step (by using reduceByKey and keeping a counter in the reduce value): val closest = data.map (p = (closestPoint(p, kPoints), (p, 1))) val pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) = (x1 + x2, y1 + y2)} and then proceeds by dividing the sum of all points of a cluster by the counted number of points in the cluster: val newPoints = pointStats.map {pair = (pair._1, pair._2._1 / pair._2._2)}.collectAsMap() Afterwards the change of the new centroids is calculated in order to know when to stop iterating: tempDist = 0.0 for (i - 0 until K) { tempDist += kPoints(i).squaredDist(newPoints(i)) } *my algorithm * (https://github.com/ticup/k-means-spark/blob/master/src/main/scala/k-means.scala) is less clever, but more straightforward: it just groups all the points of each cluster and then proceeds to calculate the average on those points and adds the difference with the previous centroid to an accumulator: // accumulator for differences new centroids dist = sc.accumulator(0.0) // calculate new centroids + add difference to old centroids centroids = closest.groupByKey().map{case(i, points) = val newCentroid = average(points) dist += centroids(i).squaredDist(newCentroid) newCentroid }.collect() with: def average(points: Seq[Vector]) : Vector = { points.reduce(_+_) / points.length } So, the big differences are: 1. Use of accumulator 2. Do excessive work by not cleverly calculating the average 3. Accesses the centroids val from within the map Now, why I'm here for, this version runs EXTREMELY slow and gets outOfHeapMemory exceptions for data input that the original algorithm easily solves in ~5seconds. I'm trying to pinpoint what exactly is causing this huge difference. The use of an accumulator shouldn't really affect the performance and it doesn't, because I tried it without the accumulator and it stays as slow. Further, I expect the excessive work to slow down the algorithm with a factor of 2 or something, but this is really a decrease in factors of 10 or more. Even with 1 worker and 1 core (thus no parallelism) the difference in speed stays the same, so it's not because the averaging is not parallelised properly, there must be something going on that is much more important. Could someone give me pointers on what exactly is happening here? It can't be because I'm just accessing the centroids value from within the closure? Speed comparison: The *slow algorithm*: 44 seconds to perform the map 14/04/19 13:03:15 INFO scheduler.DAGScheduler: Stage 3 (map at k-means.scala:114) finished in 43.909 s The *fast algorithm*: more or less the same operations (in 2 steps instead of 1) in 2.2 seconds 14/04/19 12:52:29 INFO scheduler.DAGScheduler: Stage 3 (reduceByKey at k-means.scala:84) finished in 2.090 s 14/04/19 12:52:29 INFO scheduler.DAGScheduler: Stage 2 (collectAsMap at k-means.scala:86) finished in 0.117 s Thanks in advance, Tim. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/extremely-slow-k-means-version-tp4489.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: ui broken in latest 1.0.0
got it. makes sense. i am surprised it worked before... On Apr 18, 2014 9:12 PM, Andrew Or and...@databricks.com wrote: Hi Koert, I've tracked down what the bug is. The caveat is that each StageInfo only keeps around the RDDInfo of the last RDD associated with the Stage. More concretely, if you have something like sc.parallelize(1 to 1000).persist.map(i = (i, i)).count() This creates two RDDs within one Stage, and the persisted RDD doesn't show up on the UI because it is not the last RDD of this stage. I filed a JIRA for this here: https://issues.apache.org/jira/browse/SPARK-1538. Thanks again for reporting this. I will push out a fix shortly. Andrew On Tue, Apr 8, 2014 at 1:30 PM, Koert Kuipers ko...@tresata.com wrote: our one cached RDD in this run has id 3 *** onStageSubmitted ** rddInfo: RDD 2 (2) Storage: StorageLevel(false, false, false, false, 1); CachedPartitions: 0; TotalPartitions: 1; MemorySize: 0.0 B;TachyonSize: 0.0 B; DiskSize: 0.0 B _rddInfoMap: Map(2 - RDD 2 (2) Storage: StorageLevel(false, false, false, false, 1); CachedPartitions: 0; TotalPartitions: 1; MemorySize: 0.0 B;TachyonSize: 0.0 B; DiskSize: 0.0 B) *** onTaskEnd ** _rddInfoMap: Map(2 - RDD 2 (2) Storage: StorageLevel(false, false, false, false, 1); CachedPartitions: 0; TotalPartitions: 1; MemorySize: 0.0 B;TachyonSize: 0.0 B; DiskSize: 0.0 B) storageStatusList: List(StorageStatus(BlockManagerId(driver, 192.168.3.169, 34330, 0),579325132,Map())) *** onStageCompleted ** _rddInfoMap: Map() *** onStageSubmitted ** rddInfo: RDD 7 (7) Storage: StorageLevel(false, false, false, false, 1); CachedPartitions: 0; TotalPartitions: 1; MemorySize: 0.0 B;TachyonSize: 0.0 B; DiskSize: 0.0 B _rddInfoMap: Map(7 - RDD 7 (7) Storage: StorageLevel(false, false, false, false, 1); CachedPartitions: 0; TotalPartitions: 1; MemorySize: 0.0 B;TachyonSize: 0.0 B; DiskSize: 0.0 B) *** updateRDDInfo ** *** onTaskEnd ** _rddInfoMap: Map(7 - RDD 7 (7) Storage: StorageLevel(false, false, false, false, 1); CachedPartitions: 0; TotalPartitions: 1; MemorySize: 0.0 B;TachyonSize: 0.0 B; DiskSize: 0.0 B) storageStatusList: List(StorageStatus(BlockManagerId(driver, 192.168.3.169, 34330, 0),579325132,Map(rdd_3_0 - BlockStatus(StorageLevel(false, true, false, true, 1),19944,0,0 *** onStageCompleted ** _rddInfoMap: Map() On Tue, Apr 8, 2014 at 4:20 PM, Koert Kuipers ko...@tresata.com wrote: 1) at the end of the callback 2) yes we simply expose sc.getRDDStorageInfo to the user via REST 3) yes exactly. we define the RDDs at startup, all of them are cached. from that point on we only do calculations on these cached RDDs. i will add some more println statements for storageStatusList On Tue, Apr 8, 2014 at 4:01 PM, Andrew Or and...@databricks.com wrote: Hi Koert, Thanks for pointing this out. However, I am unable to reproduce this locally. It seems that there is a discrepancy between what the BlockManagerUI and the SparkContext think is persisted. This is strange because both sources ultimately derive this information from the same place - by doing sc.getExecutorStorageStatus. I have a couple of questions for you: 1) In your print statements, do you print them in the beginning or at the end of each callback? It would be good to keep them at the end, since in the beginning the data structures have not been processed yet. 2) You mention that you get the RDD info through your own API. How do you get this information? Is it through sc.getRDDStorageInfo? 3) What did your application do to produce this behavior? Did you make an RDD, persist it once, and then use it many times afterwards or something similar? It would be super helpful if you could also print out what StorageStatusListener's storageStatusList looks like by the end of each onTaskEnd. I will continue to look into this on my side, but do let me know once you have any updates. Andrew On Tue, Apr 8, 2014 at 11:26 AM, Koert Kuipers ko...@tresata.comwrote: yet at same time i can see via our own api: storageInfo: { diskSize: 0, memSize: 19944, numCachedPartitions: 1, numPartitions: 1 } On Tue, Apr 8, 2014 at 2:25 PM, Koert Kuipers ko...@tresata.comwrote: i put some println statements in BlockManagerUI i have RDDs that are cached in memory. I see this: *** onStageSubmitted ** rddInfo: RDD 2 (2) Storage: StorageLevel(false, false, false, false, 1); CachedPartitions: 0; TotalPartitions: 1; MemorySize: 0.0 B;TachyonSize: 0.0 B; DiskSize: 0.0 B _rddInfoMap: Map(2 - RDD 2 (2) Storage: StorageLevel(false, false, false, false, 1); CachedPartitions: 0; TotalPartitions:
Re: ui broken in latest 1.0.0
The reason why it worked before was because the UI would directly access sc.getStorageStatus, instead of getting it through Task and Stage events. This is not necessarily the best design, however, because the SparkContext and the SparkUI are closely coupled, and there is no way to create a SparkUI independently from an application. On Sat, Apr 19, 2014 at 7:45 AM, Koert Kuipers ko...@tresata.com wrote: got it. makes sense. i am surprised it worked before... On Apr 18, 2014 9:12 PM, Andrew Or and...@databricks.com wrote: Hi Koert, I've tracked down what the bug is. The caveat is that each StageInfo only keeps around the RDDInfo of the last RDD associated with the Stage. More concretely, if you have something like sc.parallelize(1 to 1000).persist.map(i = (i, i)).count() This creates two RDDs within one Stage, and the persisted RDD doesn't show up on the UI because it is not the last RDD of this stage. I filed a JIRA for this here: https://issues.apache.org/jira/browse/SPARK-1538. Thanks again for reporting this. I will push out a fix shortly. Andrew On Tue, Apr 8, 2014 at 1:30 PM, Koert Kuipers ko...@tresata.com wrote: our one cached RDD in this run has id 3 *** onStageSubmitted ** rddInfo: RDD 2 (2) Storage: StorageLevel(false, false, false, false, 1); CachedPartitions: 0; TotalPartitions: 1; MemorySize: 0.0 B;TachyonSize: 0.0 B; DiskSize: 0.0 B _rddInfoMap: Map(2 - RDD 2 (2) Storage: StorageLevel(false, false, false, false, 1); CachedPartitions: 0; TotalPartitions: 1; MemorySize: 0.0 B;TachyonSize: 0.0 B; DiskSize: 0.0 B) *** onTaskEnd ** _rddInfoMap: Map(2 - RDD 2 (2) Storage: StorageLevel(false, false, false, false, 1); CachedPartitions: 0; TotalPartitions: 1; MemorySize: 0.0 B;TachyonSize: 0.0 B; DiskSize: 0.0 B) storageStatusList: List(StorageStatus(BlockManagerId(driver, 192.168.3.169, 34330, 0),579325132,Map())) *** onStageCompleted ** _rddInfoMap: Map() *** onStageSubmitted ** rddInfo: RDD 7 (7) Storage: StorageLevel(false, false, false, false, 1); CachedPartitions: 0; TotalPartitions: 1; MemorySize: 0.0 B;TachyonSize: 0.0 B; DiskSize: 0.0 B _rddInfoMap: Map(7 - RDD 7 (7) Storage: StorageLevel(false, false, false, false, 1); CachedPartitions: 0; TotalPartitions: 1; MemorySize: 0.0 B;TachyonSize: 0.0 B; DiskSize: 0.0 B) *** updateRDDInfo ** *** onTaskEnd ** _rddInfoMap: Map(7 - RDD 7 (7) Storage: StorageLevel(false, false, false, false, 1); CachedPartitions: 0; TotalPartitions: 1; MemorySize: 0.0 B;TachyonSize: 0.0 B; DiskSize: 0.0 B) storageStatusList: List(StorageStatus(BlockManagerId(driver, 192.168.3.169, 34330, 0),579325132,Map(rdd_3_0 - BlockStatus(StorageLevel(false, true, false, true, 1),19944,0,0 *** onStageCompleted ** _rddInfoMap: Map() On Tue, Apr 8, 2014 at 4:20 PM, Koert Kuipers ko...@tresata.com wrote: 1) at the end of the callback 2) yes we simply expose sc.getRDDStorageInfo to the user via REST 3) yes exactly. we define the RDDs at startup, all of them are cached. from that point on we only do calculations on these cached RDDs. i will add some more println statements for storageStatusList On Tue, Apr 8, 2014 at 4:01 PM, Andrew Or and...@databricks.comwrote: Hi Koert, Thanks for pointing this out. However, I am unable to reproduce this locally. It seems that there is a discrepancy between what the BlockManagerUI and the SparkContext think is persisted. This is strange because both sources ultimately derive this information from the same place - by doing sc.getExecutorStorageStatus. I have a couple of questions for you: 1) In your print statements, do you print them in the beginning or at the end of each callback? It would be good to keep them at the end, since in the beginning the data structures have not been processed yet. 2) You mention that you get the RDD info through your own API. How do you get this information? Is it through sc.getRDDStorageInfo? 3) What did your application do to produce this behavior? Did you make an RDD, persist it once, and then use it many times afterwards or something similar? It would be super helpful if you could also print out what StorageStatusListener's storageStatusList looks like by the end of each onTaskEnd. I will continue to look into this on my side, but do let me know once you have any updates. Andrew On Tue, Apr 8, 2014 at 11:26 AM, Koert Kuipers ko...@tresata.comwrote: yet at same time i can see via our own api: storageInfo: { diskSize: 0, memSize: 19944, numCachedPartitions: 1, numPartitions: 1 } On Tue, Apr 8, 2014 at 2:25 PM, Koert Kuipers ko...@tresata.comwrote: i put some println statements in
Re: extremely slow k-means version
The problem is that groupByKey means “bring all the points with this same key to the same JVM”. Your input is a Seq[Point], so you have to have all the points there. This means that a) all points will be sent across the network in a cluster, which is slow (and Spark goes through this sending code path even in local mode so it serializes the data), and b) you’ll get out of memory errors if that Seq is too big. In large-scale data processing, data movement is often the biggest cost, so you have to carefully choose which operations to use. Matei On Apr 19, 2014, at 4:04 AM, ticup tim.coppiete...@gmail.com wrote: Hi, I was playing around with other k-means implementations in Scala/Spark in order to test performances and get a better grasp on Spark. Now, I made one similar to the one from the examples (https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala), except that it's a bit less clever. Nevertheless, I expect a non-expert scala/spark programmer to write similar code instead of that from the example. Here is how they compare: in the step of calculating the new centroids (this is done by taking the average of all points belonging to the current centroids - the main workhorse of the algo), where the *example algorithm* adds the points of the same cluster and keeps track of the number of points in each cluster in 1 step (by using reduceByKey and keeping a counter in the reduce value): val closest = data.map (p = (closestPoint(p, kPoints), (p, 1))) val pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) = (x1 + x2, y1 + y2)} and then proceeds by dividing the sum of all points of a cluster by the counted number of points in the cluster: val newPoints = pointStats.map {pair = (pair._1, pair._2._1 / pair._2._2)}.collectAsMap() Afterwards the change of the new centroids is calculated in order to know when to stop iterating: tempDist = 0.0 for (i - 0 until K) { tempDist += kPoints(i).squaredDist(newPoints(i)) } *my algorithm * (https://github.com/ticup/k-means-spark/blob/master/src/main/scala/k-means.scala) is less clever, but more straightforward: it just groups all the points of each cluster and then proceeds to calculate the average on those points and adds the difference with the previous centroid to an accumulator: // accumulator for differences new centroids dist = sc.accumulator(0.0) // calculate new centroids + add difference to old centroids centroids = closest.groupByKey().map{case(i, points) = val newCentroid = average(points) dist += centroids(i).squaredDist(newCentroid) newCentroid }.collect() with: def average(points: Seq[Vector]) : Vector = { points.reduce(_+_) / points.length } So, the big differences are: 1. Use of accumulator 2. Do excessive work by not cleverly calculating the average 3. Accesses the centroids val from within the map Now, why I'm here for, this version runs EXTREMELY slow and gets outOfHeapMemory exceptions for data input that the original algorithm easily solves in ~5seconds. I'm trying to pinpoint what exactly is causing this huge difference. The use of an accumulator shouldn't really affect the performance and it doesn't, because I tried it without the accumulator and it stays as slow. Further, I expect the excessive work to slow down the algorithm with a factor of 2 or something, but this is really a decrease in factors of 10 or more. Even with 1 worker and 1 core (thus no parallelism) the difference in speed stays the same, so it's not because the averaging is not parallelised properly, there must be something going on that is much more important. Could someone give me pointers on what exactly is happening here? It can't be because I'm just accessing the centroids value from within the closure? Speed comparison: The *slow algorithm*: 44 seconds to perform the map 14/04/19 13:03:15 INFO scheduler.DAGScheduler: Stage 3 (map at k-means.scala:114) finished in 43.909 s The *fast algorithm*: more or less the same operations (in 2 steps instead of 1) in 2.2 seconds 14/04/19 12:52:29 INFO scheduler.DAGScheduler: Stage 3 (reduceByKey at k-means.scala:84) finished in 2.090 s 14/04/19 12:52:29 INFO scheduler.DAGScheduler: Stage 2 (collectAsMap at k-means.scala:86) finished in 0.117 s Thanks in advance, Tim. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/extremely-slow-k-means-version-tp4489.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: extremely slow k-means version
Thanks a lot for the explanation Matei. As a matter of fact, I was just reading up on the paper on the Narrow and Wide Dependencies and saw that groupByKey is indeed a wide dependency which, as you explained, is the problem. Maybe it wouldn't be a bad thing to have a section in the docs on the wide/narrow dependencies? And maybe for each transformation the dependency it creates. Although it's mostly obvious, it will stress the fact better that you need to choose your transformations very carefully and that some are much more preferred than others. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/extremely-slow-k-means-version-tp4489p4493.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Anyone using value classes in RDDs?
No, you can wrap other types in value classes as well. You can try it in the REPL: scala case class ID(val id: String) extends AnyVal defined class ID scala val i = ID(foo) i: ID = ID(foo) On Fri, Apr 18, 2014 at 4:14 PM, Koert Kuipers [via Apache Spark User List] ml-node+s1001560n4475...@n3.nabble.com wrote: isn't valueclasses for primitives (AnyVal) only? that doesn't apply to string, which is an object (AnyRef) On Fri, Apr 18, 2014 at 2:51 PM, kamatsuoka [hidden email]http://user/SendEmail.jtp?type=nodenode=4475i=0 wrote: I'm wondering if anyone has tried using value classes in RDDs? My use case is that I have a number of RDDs containing strings, e.g. val r1: RDD[(String, (String, Int)] = ... val r2: RDD[(String, (String, Int)] = ... and it might be clearer if I wrote case class ID(val id: String) extends AnyVal case class Name(val id: String) extends AnyVal val r1: RDD[(ID, (Name, Int)] = ... val r2: RDD[(Name, (ID, Int)] = ... This seems like a pretty typical use case for value classes, but I haven't noticed anyone talking about it. Although, I think you'd have to read through all of the Spark code paths to know whether allocation is required (http://docs.scala-lang.org/overviews/core/value-classes.html), so some comparative performance testing would be called for. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Anyone-using-value-classes-in-RDDs-tp4464.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Anyone-using-value-classes-in-RDDs-tp4464p4475.html To unsubscribe from Anyone using value classes in RDDs?, click herehttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=4464code=a2VuamltQGdtYWlsLmNvbXw0NDY0fC02MzgwODY1NQ== . NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- Kenji -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Anyone-using-value-classes-in-RDDs-tp4464p4494.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Help with error initializing SparkR.
I can't initialize sc context after a successful install on Cloudera quickstart VM. This is the error message: library(SparkR) Loading required package: rJava [SparkR] Initializing with classpath /usr/lib64/R/library/SparkR/sparkr-assembly-0.1.jar sc - sparkR.init(local) Error in .jcall(RJavaTools, Ljava/lang/Object;, invokeMethod, cl, : java.lang.NoClassDefFoundError: scala.collection.immutable.Vector I also created an issue request on the github, which contains more details about this issue https://github.com/amplab-extras/SparkR-pkg/issues/46 Thanks for any help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Help-with-error-initializing-SparkR-tp4495.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
questions about toArray and ClassTag
Hi, all i'am quite new in scala, i do some tests in spark shell val b = a.mapPartitions{D = val p = D.toArray . p.toIterator } when a is an RDD of type RDD[Int], b.collect() works. but when i change a to RDD[MyOwnType], b.collect() returns error: 14/04/20 10:14:46 ERROR OneForOneStrategy: [L$line5.$read$$iwC$$iwC$$iwC$$iwC$InputMatrix; java.lang.ArrayStoreException: [L$line5.$read$$iwC$$iwC$$iwC$$iwC$InputMatrix; at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88) i figure out the question is about toArray def toArray[B : A](implicit arg0: ClassTag[B]): Array[B] an array containing all elements of this traversable or iterator. An ClassTag must be available for the element type of this traversable or iterator. but i do not know how to do this, how to define an ClassTag, and give it to toArray? i tried to set implicit arg0 like this: val p = D.toArray(classTag[MyOwnType]) but it doesn't work thanks for your help -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/questions-about-toArray-and-ClassTag-tp4496.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Task splitting among workers
During a Spark stage, how are tasks split among the workers? Specifically for a HadoopRDD, who determines which worker has to get which task?