groupByKey does merge the values associated with the same key in different
partitions:
scala> val rdd = sc.parallelize(List(1, 1, 1, 1),
4).mapPartitionsWithIndex((idx, itr) => List(("foo", idx ->
math.random),("bar", idx -> math.random)).toIterator)
scala> rdd.collect.foreach(println)
(foo,(0,0.7387266457142971))
(bar,(0,0.06390701080780203))
(foo,(1,0.3601832111876926))
(bar,(1,0.5247725435958681))
(foo,(2,0.7486323021599729))
(bar,(2,0.9185837845634715))
(foo,(3,0.17591718413623136))
(bar,(3,0.12096331089133605))
scala> rdd.groupByKey.collect.foreach(println)
(foo,ArrayBuffer((0,0.8432285514154537), (1,0.3005967566708283),
(2,0.6150820518108783), (3,0.4779052219014124)))
(bar,ArrayBuffer((0,0.8190206253566251), (1,0.3465707665527258),
(2,0.5187789456090471), (3,0.9612998198743644)))
On Sun, Jan 26, 2014 at 12:22 PM, Archit Thakur
<[email protected]>wrote:
> Hi,
>
> Below is the implementation for GroupByKey. (v, 0.8.0)
>
>
> def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
> def createCombiner(v: V) = ArrayBuffer(v)
> def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
> val bufs = combineByKey[ArrayBuffer[V]](
> createCombiner _, mergeValue _, null, partitioner,
> mapSideCombine=false)
> bufs.asInstanceOf[RDD[(K, Seq[V])]]
> }
>
> and CombineValuesByKey (Aggregator.scala):
>
> def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K,
> C)] = {
> val combiners = new JHashMap[K, C]
> for (kv <- iter) {
> val oldC = combiners.get(kv._1)
> if (oldC == null) {
> combiners.put(kv._1, createCombiner(kv._2))
> } else {
> combiners.put(kv._1, mergeValue(oldC, kv._2))
> }
> }
> combiners.iterator
> }
>
> My doubt is why null is being passed for mergeCombiners closure.
>
> If two different partitions have same key, wouldn't there be the
> requirement to merge them afterwards?
>
> Thanks,
> Archit.
>