While I echo Mark's sentiment, versioning has nothing to do with this problem. It has been the case even in Spark 0.8.0.
Note that mapSideCombine is turned off for groupByKey, so there is no need to merge any combiners. On Sun, Jan 26, 2014 at 12:22 PM, Archit Thakur <[email protected]>wrote: > Hi, > > Below is the implementation for GroupByKey. (v, 0.8.0) > > > def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = { > def createCombiner(v: V) = ArrayBuffer(v) > def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v > val bufs = combineByKey[ArrayBuffer[V]]( > createCombiner _, mergeValue _, null, partitioner, > mapSideCombine=false) > bufs.asInstanceOf[RDD[(K, Seq[V])]] > } > > and CombineValuesByKey (Aggregator.scala): > > def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, > C)] = { > val combiners = new JHashMap[K, C] > for (kv <- iter) { > val oldC = combiners.get(kv._1) > if (oldC == null) { > combiners.put(kv._1, createCombiner(kv._2)) > } else { > combiners.put(kv._1, mergeValue(oldC, kv._2)) > } > } > combiners.iterator > } > > My doubt is why null is being passed for mergeCombiners closure. > > If two different partitions have same key, wouldn't there be the > requirement to merge them afterwards? > > Thanks, > Archit. >
