extremely slow k-means version

2014-04-19 Thread ticup
Hi,

I was playing around with other k-means implementations in Scala/Spark in
order to test performances and get a better grasp on Spark.

Now, I made one similar to the one from the examples
(https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala),
except that it's a bit less clever. Nevertheless, I expect a non-expert
scala/spark programmer to write similar code instead of that from the
example.

Here is how they compare: in the step of calculating the new centroids (this
is done by taking the average of all points belonging to the current
centroids - the main workhorse of the algo), where the *example algorithm*
adds the points of the same cluster and keeps track of the number of points
in each cluster in 1 step (by using reduceByKey and keeping a counter in the
reduce value):

val closest = data.map (p = (closestPoint(p, kPoints), (p, 1)))

val pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) = (x1 + x2,
y1 + y2)}

and then proceeds by dividing the sum of all points of a cluster by the
counted number of points in the cluster:

val newPoints = pointStats.map {pair = (pair._1, pair._2._1 /
pair._2._2)}.collectAsMap()

Afterwards the change of the new centroids is calculated in order to know
when to stop iterating:

tempDist = 0.0

for (i - 0 until K) {
 tempDist += kPoints(i).squaredDist(newPoints(i))
}



*my algorithm *
(https://github.com/ticup/k-means-spark/blob/master/src/main/scala/k-means.scala)
is less clever, but more straightforward: it just groups all the points of
each cluster and then proceeds to calculate the average on those points and
adds the difference with the previous centroid to an accumulator:

 // accumulator for differences new centroids
 dist = sc.accumulator(0.0)

// calculate new centroids + add difference to old centroids
centroids = closest.groupByKey().map{case(i, points) =
val newCentroid = average(points)
dist += centroids(i).squaredDist(newCentroid)
newCentroid
}.collect()

with:

def average(points: Seq[Vector]) : Vector = {
points.reduce(_+_) / points.length
}

So, the big differences are:

1. Use of accumulator
2. Do excessive work by not cleverly calculating the average
3. Accesses the centroids val from within the map


Now, why I'm here for, this version runs EXTREMELY slow and gets
outOfHeapMemory exceptions for data input that the original algorithm easily
solves in ~5seconds. I'm trying to pinpoint what exactly is causing this
huge difference. The use of an accumulator shouldn't really affect the
performance and it doesn't, because I tried it without the accumulator and
it stays as slow. Further, I expect the excessive work to slow down the
algorithm with a factor of 2 or something, but this is really a decrease in
factors of 10 or more.

Even with 1 worker and 1 core (thus no parallelism) the difference in speed
stays the same, so it's not because the averaging is not parallelised
properly, there must be something going on that is much more important.

Could someone give me pointers on what exactly is happening here? It can't
be because I'm just accessing the centroids value from within the closure?

Speed comparison:

The *slow algorithm*: 44 seconds to perform the map
14/04/19 13:03:15 INFO scheduler.DAGScheduler: Stage 3 (map at
k-means.scala:114) finished in 43.909 s


The *fast algorithm*: more or less the same operations (in 2 steps instead
of 1) in 2.2 seconds

14/04/19 12:52:29 INFO scheduler.DAGScheduler: Stage 3 (reduceByKey at
k-means.scala:84) finished in 2.090 s

14/04/19 12:52:29 INFO scheduler.DAGScheduler: Stage 2 (collectAsMap at
k-means.scala:86) finished in 0.117 s


Thanks in advance,
Tim.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/extremely-slow-k-means-version-tp4489.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: ui broken in latest 1.0.0

2014-04-19 Thread Koert Kuipers
got it. makes sense. i am surprised it worked before...
On Apr 18, 2014 9:12 PM, Andrew Or and...@databricks.com wrote:

 Hi Koert,

 I've tracked down what the bug is. The caveat is that each StageInfo only
 keeps around the RDDInfo of the last RDD associated with the Stage. More
 concretely, if you have something like

 sc.parallelize(1 to 1000).persist.map(i = (i, i)).count()

 This creates two RDDs within one Stage, and the persisted RDD doesn't show
 up on the UI because it is not the last RDD of this stage. I filed a JIRA
 for this here: https://issues.apache.org/jira/browse/SPARK-1538.

 Thanks again for reporting this. I will push out a fix shortly.
 Andrew


 On Tue, Apr 8, 2014 at 1:30 PM, Koert Kuipers ko...@tresata.com wrote:

 our one cached RDD in this run has id 3




 *** onStageSubmitted **
 rddInfo: RDD 2 (2) Storage: StorageLevel(false, false, false, false,
 1); CachedPartitions: 0; TotalPartitions: 1; MemorySize: 0.0 B;TachyonSize:
 0.0 B; DiskSize: 0.0 B
 _rddInfoMap: Map(2 - RDD 2 (2) Storage: StorageLevel(false, false,
 false, false, 1); CachedPartitions: 0; TotalPartitions: 1; MemorySize: 0.0
 B;TachyonSize: 0.0 B; DiskSize: 0.0 B)


 *** onTaskEnd **
 _rddInfoMap: Map(2 - RDD 2 (2) Storage: StorageLevel(false, false,
 false, false, 1); CachedPartitions: 0; TotalPartitions: 1; MemorySize: 0.0
 B;TachyonSize: 0.0 B; DiskSize: 0.0 B)
 storageStatusList: List(StorageStatus(BlockManagerId(driver,
 192.168.3.169, 34330, 0),579325132,Map()))


 *** onStageCompleted **
 _rddInfoMap: Map()



 *** onStageSubmitted **
 rddInfo: RDD 7 (7) Storage: StorageLevel(false, false, false, false,
 1); CachedPartitions: 0; TotalPartitions: 1; MemorySize: 0.0 B;TachyonSize:
 0.0 B; DiskSize: 0.0 B
 _rddInfoMap: Map(7 - RDD 7 (7) Storage: StorageLevel(false, false,
 false, false, 1); CachedPartitions: 0; TotalPartitions: 1; MemorySize: 0.0
 B;TachyonSize: 0.0 B; DiskSize: 0.0 B)


 *** updateRDDInfo **


 *** onTaskEnd **

 _rddInfoMap: Map(7 - RDD 7 (7) Storage: StorageLevel(false, false,
 false, false, 1); CachedPartitions: 0; TotalPartitions: 1; MemorySize: 0.0
 B;TachyonSize: 0.0 B; DiskSize: 0.0 B)
  storageStatusList: List(StorageStatus(BlockManagerId(driver,
 192.168.3.169, 34330, 0),579325132,Map(rdd_3_0 -
 BlockStatus(StorageLevel(false, true, false, true, 1),19944,0,0


 *** onStageCompleted **
 _rddInfoMap: Map()



 On Tue, Apr 8, 2014 at 4:20 PM, Koert Kuipers ko...@tresata.com wrote:

 1) at the end of the callback

 2) yes we simply expose sc.getRDDStorageInfo to the user via REST

 3) yes exactly. we define the RDDs at startup, all of them are cached.
 from that point on we only do calculations on these cached RDDs.

 i will add some more println statements for storageStatusList



 On Tue, Apr 8, 2014 at 4:01 PM, Andrew Or and...@databricks.com wrote:

 Hi Koert,

 Thanks for pointing this out. However, I am unable to reproduce this
 locally. It seems that there is a discrepancy between what the
 BlockManagerUI and the SparkContext think is persisted. This is strange
 because both sources ultimately derive this information from the same place
 - by doing sc.getExecutorStorageStatus. I have a couple of questions for
 you:

 1) In your print statements, do you print them in the beginning or at
 the end of each callback? It would be good to keep them at the end, since
 in the beginning the data structures have not been processed yet.
 2) You mention that you get the RDD info through your own API. How do
 you get this information? Is it through sc.getRDDStorageInfo?
 3) What did your application do to produce this behavior? Did you make
 an RDD, persist it once, and then use it many times afterwards or something
 similar?

 It would be super helpful if you could also print out what
 StorageStatusListener's storageStatusList looks like by the end of each
 onTaskEnd. I will continue to look into this on my side, but do let me know
 once you have any updates.

 Andrew


 On Tue, Apr 8, 2014 at 11:26 AM, Koert Kuipers ko...@tresata.comwrote:

 yet at same time i can see via our own api:

 storageInfo: {
 diskSize: 0,
 memSize: 19944,
 numCachedPartitions: 1,
 numPartitions: 1
 }



 On Tue, Apr 8, 2014 at 2:25 PM, Koert Kuipers ko...@tresata.comwrote:

 i put some println statements in BlockManagerUI

 i have RDDs that are cached in memory. I see this:


 *** onStageSubmitted **
 rddInfo: RDD 2 (2) Storage: StorageLevel(false, false, false,
 false, 1); CachedPartitions: 0; TotalPartitions: 1; MemorySize: 0.0
 B;TachyonSize: 0.0 B; DiskSize: 0.0 B
 _rddInfoMap: Map(2 - RDD 2 (2) Storage: StorageLevel(false, false,
 false, false, 1); CachedPartitions: 0; TotalPartitions: 

Re: ui broken in latest 1.0.0

2014-04-19 Thread Andrew Or
The reason why it worked before was because the UI would directly access
sc.getStorageStatus, instead of getting it through Task and Stage events.
This is not necessarily the best design, however, because the SparkContext
and the SparkUI are closely coupled, and there is no way to create a
SparkUI independently from an application.


On Sat, Apr 19, 2014 at 7:45 AM, Koert Kuipers ko...@tresata.com wrote:

 got it. makes sense. i am surprised it worked before...
 On Apr 18, 2014 9:12 PM, Andrew Or and...@databricks.com wrote:

 Hi Koert,

 I've tracked down what the bug is. The caveat is that each StageInfo only
 keeps around the RDDInfo of the last RDD associated with the Stage. More
 concretely, if you have something like

 sc.parallelize(1 to 1000).persist.map(i = (i, i)).count()

 This creates two RDDs within one Stage, and the persisted RDD doesn't
 show up on the UI because it is not the last RDD of this stage. I filed a
 JIRA for this here: https://issues.apache.org/jira/browse/SPARK-1538.

 Thanks again for reporting this. I will push out a fix shortly.
 Andrew


 On Tue, Apr 8, 2014 at 1:30 PM, Koert Kuipers ko...@tresata.com wrote:

 our one cached RDD in this run has id 3




 *** onStageSubmitted **
 rddInfo: RDD 2 (2) Storage: StorageLevel(false, false, false, false,
 1); CachedPartitions: 0; TotalPartitions: 1; MemorySize: 0.0 B;TachyonSize:
 0.0 B; DiskSize: 0.0 B
 _rddInfoMap: Map(2 - RDD 2 (2) Storage: StorageLevel(false, false,
 false, false, 1); CachedPartitions: 0; TotalPartitions: 1; MemorySize: 0.0
 B;TachyonSize: 0.0 B; DiskSize: 0.0 B)


 *** onTaskEnd **
 _rddInfoMap: Map(2 - RDD 2 (2) Storage: StorageLevel(false, false,
 false, false, 1); CachedPartitions: 0; TotalPartitions: 1; MemorySize: 0.0
 B;TachyonSize: 0.0 B; DiskSize: 0.0 B)
 storageStatusList: List(StorageStatus(BlockManagerId(driver,
 192.168.3.169, 34330, 0),579325132,Map()))


 *** onStageCompleted **
 _rddInfoMap: Map()



 *** onStageSubmitted **
 rddInfo: RDD 7 (7) Storage: StorageLevel(false, false, false, false,
 1); CachedPartitions: 0; TotalPartitions: 1; MemorySize: 0.0 B;TachyonSize:
 0.0 B; DiskSize: 0.0 B
 _rddInfoMap: Map(7 - RDD 7 (7) Storage: StorageLevel(false, false,
 false, false, 1); CachedPartitions: 0; TotalPartitions: 1; MemorySize: 0.0
 B;TachyonSize: 0.0 B; DiskSize: 0.0 B)


 *** updateRDDInfo **


 *** onTaskEnd **

 _rddInfoMap: Map(7 - RDD 7 (7) Storage: StorageLevel(false, false,
 false, false, 1); CachedPartitions: 0; TotalPartitions: 1; MemorySize: 0.0
 B;TachyonSize: 0.0 B; DiskSize: 0.0 B)
  storageStatusList: List(StorageStatus(BlockManagerId(driver,
 192.168.3.169, 34330, 0),579325132,Map(rdd_3_0 -
 BlockStatus(StorageLevel(false, true, false, true, 1),19944,0,0


 *** onStageCompleted **
 _rddInfoMap: Map()



 On Tue, Apr 8, 2014 at 4:20 PM, Koert Kuipers ko...@tresata.com wrote:

 1) at the end of the callback

 2) yes we simply expose sc.getRDDStorageInfo to the user via REST

 3) yes exactly. we define the RDDs at startup, all of them are cached.
 from that point on we only do calculations on these cached RDDs.

 i will add some more println statements for storageStatusList



 On Tue, Apr 8, 2014 at 4:01 PM, Andrew Or and...@databricks.comwrote:

 Hi Koert,

 Thanks for pointing this out. However, I am unable to reproduce this
 locally. It seems that there is a discrepancy between what the
 BlockManagerUI and the SparkContext think is persisted. This is strange
 because both sources ultimately derive this information from the same 
 place
 - by doing sc.getExecutorStorageStatus. I have a couple of questions for
 you:

 1) In your print statements, do you print them in the beginning or at
 the end of each callback? It would be good to keep them at the end, since
 in the beginning the data structures have not been processed yet.
 2) You mention that you get the RDD info through your own API. How do
 you get this information? Is it through sc.getRDDStorageInfo?
 3) What did your application do to produce this behavior? Did you make
 an RDD, persist it once, and then use it many times afterwards or 
 something
 similar?

 It would be super helpful if you could also print out what
 StorageStatusListener's storageStatusList looks like by the end of each
 onTaskEnd. I will continue to look into this on my side, but do let me 
 know
 once you have any updates.

 Andrew


 On Tue, Apr 8, 2014 at 11:26 AM, Koert Kuipers ko...@tresata.comwrote:

 yet at same time i can see via our own api:

 storageInfo: {
 diskSize: 0,
 memSize: 19944,
 numCachedPartitions: 1,
 numPartitions: 1
 }



 On Tue, Apr 8, 2014 at 2:25 PM, Koert Kuipers ko...@tresata.comwrote:

 i put some println statements in 

Re: extremely slow k-means version

2014-04-19 Thread Matei Zaharia
The problem is that groupByKey means “bring all the points with this same key 
to the same JVM”. Your input is a Seq[Point], so you have to have all the 
points there. This means that a) all points will be sent across the network in 
a cluster, which is slow (and Spark goes through this sending code path even in 
local mode so it serializes the data), and b) you’ll get out of memory errors 
if that Seq is too big. In large-scale data processing, data movement is often 
the biggest cost, so you have to carefully choose which operations to use.

Matei

On Apr 19, 2014, at 4:04 AM, ticup tim.coppiete...@gmail.com wrote:

 Hi,
 
 I was playing around with other k-means implementations in Scala/Spark in
 order to test performances and get a better grasp on Spark.
 
 Now, I made one similar to the one from the examples
 (https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala),
 except that it's a bit less clever. Nevertheless, I expect a non-expert
 scala/spark programmer to write similar code instead of that from the
 example.
 
 Here is how they compare: in the step of calculating the new centroids (this
 is done by taking the average of all points belonging to the current
 centroids - the main workhorse of the algo), where the *example algorithm*
 adds the points of the same cluster and keeps track of the number of points
 in each cluster in 1 step (by using reduceByKey and keeping a counter in the
 reduce value):
 
 val closest = data.map (p = (closestPoint(p, kPoints), (p, 1)))
 
 val pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) = (x1 + x2,
 y1 + y2)}
 
 and then proceeds by dividing the sum of all points of a cluster by the
 counted number of points in the cluster:
 
 val newPoints = pointStats.map {pair = (pair._1, pair._2._1 /
 pair._2._2)}.collectAsMap()
 
 Afterwards the change of the new centroids is calculated in order to know
 when to stop iterating:
 
 tempDist = 0.0
 
 for (i - 0 until K) {
 tempDist += kPoints(i).squaredDist(newPoints(i))
 }
 
 
 
 *my algorithm *
 (https://github.com/ticup/k-means-spark/blob/master/src/main/scala/k-means.scala)
 is less clever, but more straightforward: it just groups all the points of
 each cluster and then proceeds to calculate the average on those points and
 adds the difference with the previous centroid to an accumulator:
 
 // accumulator for differences new centroids
 dist = sc.accumulator(0.0)
 
 // calculate new centroids + add difference to old centroids
 centroids = closest.groupByKey().map{case(i, points) =
val newCentroid = average(points)
dist += centroids(i).squaredDist(newCentroid)
newCentroid
 }.collect()
 
 with:
 
 def average(points: Seq[Vector]) : Vector = {
points.reduce(_+_) / points.length
 }
 
 So, the big differences are:
 
 1. Use of accumulator
 2. Do excessive work by not cleverly calculating the average
 3. Accesses the centroids val from within the map
 
 
 Now, why I'm here for, this version runs EXTREMELY slow and gets
 outOfHeapMemory exceptions for data input that the original algorithm easily
 solves in ~5seconds. I'm trying to pinpoint what exactly is causing this
 huge difference. The use of an accumulator shouldn't really affect the
 performance and it doesn't, because I tried it without the accumulator and
 it stays as slow. Further, I expect the excessive work to slow down the
 algorithm with a factor of 2 or something, but this is really a decrease in
 factors of 10 or more.
 
 Even with 1 worker and 1 core (thus no parallelism) the difference in speed
 stays the same, so it's not because the averaging is not parallelised
 properly, there must be something going on that is much more important.
 
 Could someone give me pointers on what exactly is happening here? It can't
 be because I'm just accessing the centroids value from within the closure?
 
 Speed comparison:
 
 The *slow algorithm*: 44 seconds to perform the map
 14/04/19 13:03:15 INFO scheduler.DAGScheduler: Stage 3 (map at
 k-means.scala:114) finished in 43.909 s
 
 
 The *fast algorithm*: more or less the same operations (in 2 steps instead
 of 1) in 2.2 seconds
 
 14/04/19 12:52:29 INFO scheduler.DAGScheduler: Stage 3 (reduceByKey at
 k-means.scala:84) finished in 2.090 s
 
 14/04/19 12:52:29 INFO scheduler.DAGScheduler: Stage 2 (collectAsMap at
 k-means.scala:86) finished in 0.117 s
 
 
 Thanks in advance,
 Tim.
 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/extremely-slow-k-means-version-tp4489.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: extremely slow k-means version

2014-04-19 Thread ticup
Thanks a lot for the explanation Matei.

As a matter of fact, I was just reading up on the paper on the Narrow and
Wide Dependencies and saw that groupByKey is indeed a wide dependency which,
as you explained, is the problem.

Maybe it wouldn't be a bad thing to have a section in the docs on the
wide/narrow dependencies? And maybe for each transformation the dependency
it creates. Although it's mostly obvious, it will stress the fact better
that you need to choose your transformations very carefully and that some
are much more preferred than others.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/extremely-slow-k-means-version-tp4489p4493.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Anyone using value classes in RDDs?

2014-04-19 Thread kamatsuoka
No, you can wrap other types in value classes as well.  You can try it in
the REPL:

scala case class ID(val id: String) extends AnyVal
defined class ID
scala val i = ID(foo)
i: ID = ID(foo)


On Fri, Apr 18, 2014 at 4:14 PM, Koert Kuipers [via Apache Spark User List]
ml-node+s1001560n4475...@n3.nabble.com wrote:

 isn't valueclasses for primitives (AnyVal) only? that doesn't apply to
 string, which is an object (AnyRef)


 On Fri, Apr 18, 2014 at 2:51 PM, kamatsuoka [hidden 
 email]http://user/SendEmail.jtp?type=nodenode=4475i=0
  wrote:

 I'm wondering if anyone has tried using value classes in RDDs?  My use
 case
 is that I have a number of RDDs containing strings, e.g.

 val r1: RDD[(String, (String, Int)] = ...
 val r2: RDD[(String, (String, Int)] = ...

 and it might be clearer if I wrote

 case class ID(val id: String) extends AnyVal
 case class Name(val id: String) extends AnyVal
 val r1: RDD[(ID, (Name, Int)] = ...
 val r2: RDD[(Name, (ID, Int)] = ...

 This seems like a pretty typical use case for value classes, but I haven't
 noticed anyone talking about it.  Although, I think you'd have to read
 through all of the Spark code paths to know whether allocation is required
 (http://docs.scala-lang.org/overviews/core/value-classes.html), so some
 comparative performance testing would be called for.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Anyone-using-value-classes-in-RDDs-tp4464.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Anyone-using-value-classes-in-RDDs-tp4464p4475.html
  To unsubscribe from Anyone using value classes in RDDs?, click 
 herehttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=4464code=a2VuamltQGdtYWlsLmNvbXw0NDY0fC02MzgwODY1NQ==
 .
 NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml




-- 
Kenji




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Anyone-using-value-classes-in-RDDs-tp4464p4494.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Help with error initializing SparkR.

2014-04-19 Thread tongzzz
I can't initialize sc context after a successful install on Cloudera
quickstart VM.
This is the error message:

 library(SparkR)
Loading required package: rJava
[SparkR] Initializing with classpath
/usr/lib64/R/library/SparkR/sparkr-assembly-0.1.jar

 sc - sparkR.init(local)
Error in .jcall(RJavaTools, Ljava/lang/Object;, invokeMethod, cl,  : 
  java.lang.NoClassDefFoundError: scala.collection.immutable.Vector


I also created an issue request on the github, which contains more details
about this issue
 
https://github.com/amplab-extras/SparkR-pkg/issues/46


Thanks for any help.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Help-with-error-initializing-SparkR-tp4495.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


questions about toArray and ClassTag

2014-04-19 Thread wxhsdp
Hi, all
  i'am quite new in scala, i do some tests in spark shell

val b = a.mapPartitions{D = 
  val p = D.toArray
  .
  p.toIterator
}

when a is an RDD of type RDD[Int], b.collect() works. but when i change a to
RDD[MyOwnType], b.collect() returns error:

14/04/20 10:14:46 ERROR OneForOneStrategy:
[L$line5.$read$$iwC$$iwC$$iwC$$iwC$InputMatrix;
java.lang.ArrayStoreException:
[L$line5.$read$$iwC$$iwC$$iwC$$iwC$InputMatrix;
at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88)

i figure out the question is about toArray
def toArray[B : A](implicit arg0: ClassTag[B]): Array[B]
an array containing all elements of this traversable or iterator. An
ClassTag must be available for the element type of this traversable or
iterator.

but i do not know how to do this, how to define an ClassTag, and give it to
toArray? i tried to set implicit 
arg0 like this:
val p = D.toArray(classTag[MyOwnType])
but it doesn't work

thanks for your help




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/questions-about-toArray-and-ClassTag-tp4496.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Task splitting among workers

2014-04-19 Thread David Thomas
During a Spark stage, how are tasks split among the workers? Specifically
for a HadoopRDD, who determines which worker has to get which task?