Hi,

Below is the implementation for GroupByKey. (v, 0.8.0)


def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
    def createCombiner(v: V) = ArrayBuffer(v)
    def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
    val bufs = combineByKey[ArrayBuffer[V]](
      createCombiner _, mergeValue _, null, partitioner,
mapSideCombine=false)
    bufs.asInstanceOf[RDD[(K, Seq[V])]]
  }

and CombineValuesByKey (Aggregator.scala):

def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K,
C)] = {
    val combiners = new JHashMap[K, C]
    for (kv <- iter) {
      val oldC = combiners.get(kv._1)
      if (oldC == null) {
        combiners.put(kv._1, createCombiner(kv._2))
      } else {
        combiners.put(kv._1, mergeValue(oldC, kv._2))
      }
    }
    combiners.iterator
  }

My doubt is why null is being passed for mergeCombiners closure.

If two different partitions have same key, wouldn't there be the
requirement to merge them afterwards?

Thanks,
Archit.

Reply via email to