Hi, Below is the implementation for GroupByKey. (v, 0.8.0)
def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = { def createCombiner(v: V) = ArrayBuffer(v) def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v val bufs = combineByKey[ArrayBuffer[V]]( createCombiner _, mergeValue _, null, partitioner, mapSideCombine=false) bufs.asInstanceOf[RDD[(K, Seq[V])]] } and CombineValuesByKey (Aggregator.scala): def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = { val combiners = new JHashMap[K, C] for (kv <- iter) { val oldC = combiners.get(kv._1) if (oldC == null) { combiners.put(kv._1, createCombiner(kv._2)) } else { combiners.put(kv._1, mergeValue(oldC, kv._2)) } } combiners.iterator } My doubt is why null is being passed for mergeCombiners closure. If two different partitions have same key, wouldn't there be the requirement to merge them afterwards? Thanks, Archit.