Using a case class as a key doesn't seem to work properly. [Spark 1.0.0]
A minimal example:
case class P(name:String)
val ps = Array(P(alice), P(bob), P(charly), P(bob))
sc.parallelize(ps).map(x= (x,1)).reduceByKey((x,y) = x+y).collect
[Spark shell local mode] res : Array[(P, Int)] =
Just to narrow down the issue, it looks like the issue is in 'reduceByKey'
and derivates like 'distinct'.
groupByKey() seems to work
sc.parallelize(ps).map(x= (x.name,1)).groupByKey().collect
res: Array[(String, Iterable[Int])] = Array((charly,ArrayBuffer(1)),
(abe,ArrayBuffer(1)),
I can confirm this bug. The behavior for groupByKey is the same as
reduceByKey - your example is actually grouping on just the name. Try this:
sc.parallelize(ps).map(x= (x,1)).groupByKey().collect
res1: Array[(P, Iterable[Int])] = Array((P(bob),ArrayBuffer(1)),
(P(bob),ArrayBuffer(1)),
Yes, right. 'sc.parallelize(ps).map(x= (**x.name**,1)).groupByKey().collect
'
An oversight from my side.
Thanks!, Gerard.
On Tue, Jul 22, 2014 at 5:24 PM, Daniel Siegmann daniel.siegm...@velos.io
wrote:
I can confirm this bug. The behavior for groupByKey is the same as
reduceByKey - your
I created https://issues.apache.org/jira/browse/SPARK-2620 to track this.
Maybe useful to know, this is a regression on Spark 1.0.0. I tested the
same sample code on 0.9.1 and it worked (we have several jobs using case
classes as key aggregators, so it better does)
-kr, Gerard.
On Tue, Jul 22,