Thanks Mark, Reynold for the quick response.

On Mon, Jan 27, 2014 at 5:07 AM, Reynold Xin <[email protected]> wrote:

> While I echo Mark's sentiment, versioning has nothing to do with this
> problem. It has been the case even in Spark 0.8.0.
>
> Note that mapSideCombine is turned off for groupByKey, so there is no need
> to merge any combiners.
>
>
> On Sun, Jan 26, 2014 at 12:22 PM, Archit Thakur
> <[email protected]>wrote:
>
> > Hi,
> >
> > Below is the implementation for GroupByKey. (v, 0.8.0)
> >
> >
> > def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
> >     def createCombiner(v: V) = ArrayBuffer(v)
> >     def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
> >     val bufs = combineByKey[ArrayBuffer[V]](
> >       createCombiner _, mergeValue _, null, partitioner,
> > mapSideCombine=false)
> >     bufs.asInstanceOf[RDD[(K, Seq[V])]]
> >   }
> >
> > and CombineValuesByKey (Aggregator.scala):
> >
> > def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) :
> Iterator[(K,
> > C)] = {
> >     val combiners = new JHashMap[K, C]
> >     for (kv <- iter) {
> >       val oldC = combiners.get(kv._1)
> >       if (oldC == null) {
> >         combiners.put(kv._1, createCombiner(kv._2))
> >       } else {
> >         combiners.put(kv._1, mergeValue(oldC, kv._2))
> >       }
> >     }
> >     combiners.iterator
> >   }
> >
> > My doubt is why null is being passed for mergeCombiners closure.
> >
> > If two different partitions have same key, wouldn't there be the
> > requirement to merge them afterwards?
> >
> > Thanks,
> > Archit.
> >
>

Reply via email to