Hi,
Below is the implementation for GroupByKey. (v, 0.8.0)
def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
def createCombiner(v: V) = ArrayBuffer(v)
def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
val bufs = combineByKey[ArrayBuffer[V]](
createCombiner _, mergeValue _, null, partitioner,
mapSideCombine=false)
bufs.asInstanceOf[RDD[(K, Seq[V])]]
}
and CombineValuesByKey (Aggregator.scala):
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K,
C)] = {
val combiners = new JHashMap[K, C]
for (kv <- iter) {
val oldC = combiners.get(kv._1)
if (oldC == null) {
combiners.put(kv._1, createCombiner(kv._2))
} else {
combiners.put(kv._1, mergeValue(oldC, kv._2))
}
}
combiners.iterator
}
My doubt is why null is being passed for mergeCombiners closure.
If two different partitions have same key, wouldn't there be the
requirement to merge them afterwards?
Thanks,
Archit.