That was run on 0.8.0-incubating ...which raises a question that has been
recurring to me of late: Why are people continuing to use 0.8.0 months
after 0.8.1 has been out and when 0.9.0 is in release candidates?  It
doesn't make a relevant difference in this case, but in general, chasing
bugs in code that is two generations out-of-date doesn't make for very
efficient development.  Spark is still pre-1.0 and is rapidly-developing
software.  As such, you should expect that the pain of staying up-to-date
is less than the pain of falling months behind -- but there is no avoiding
pain in pre-1.0 software.  Once we reach more stability and more rigorous
versioning/release practices with 1.0, it will make more sense to stick
with a major.minor release for a while and only pick up the
major.minor.patchlevel increments, but we're not there yet.


On Sun, Jan 26, 2014 at 1:45 PM, Archit Thakur <[email protected]>wrote:

> Which spark version are you on?
>
>
> On Mon, Jan 27, 2014 at 3:12 AM, Mark Hamstra <[email protected]
> >wrote:
>
> > groupByKey does merge the values associated with the same key in
> different
> > partitions:
> >
> > scala> val rdd = sc.parallelize(List(1, 1, 1, 1),
> > 4).mapPartitionsWithIndex((idx, itr) => List(("foo", idx ->
> > math.random),("bar", idx -> math.random)).toIterator)
> >
> > scala> rdd.collect.foreach(println)
> >
> > (foo,(0,0.7387266457142971))
> > (bar,(0,0.06390701080780203))
> > (foo,(1,0.3601832111876926))
> > (bar,(1,0.5247725435958681))
> > (foo,(2,0.7486323021599729))
> > (bar,(2,0.9185837845634715))
> > (foo,(3,0.17591718413623136))
> > (bar,(3,0.12096331089133605))
> >
> > scala> rdd.groupByKey.collect.foreach(println)
> >
> > (foo,ArrayBuffer((0,0.8432285514154537), (1,0.3005967566708283),
> > (2,0.6150820518108783), (3,0.4779052219014124)))
> > (bar,ArrayBuffer((0,0.8190206253566251), (1,0.3465707665527258),
> > (2,0.5187789456090471), (3,0.9612998198743644)))
> >
> >
> > On Sun, Jan 26, 2014 at 12:22 PM, Archit Thakur
> > <[email protected]>wrote:
> >
> > > Hi,
> > >
> > > Below is the implementation for GroupByKey. (v, 0.8.0)
> > >
> > >
> > > def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
> > >     def createCombiner(v: V) = ArrayBuffer(v)
> > >     def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
> > >     val bufs = combineByKey[ArrayBuffer[V]](
> > >       createCombiner _, mergeValue _, null, partitioner,
> > > mapSideCombine=false)
> > >     bufs.asInstanceOf[RDD[(K, Seq[V])]]
> > >   }
> > >
> > > and CombineValuesByKey (Aggregator.scala):
> > >
> > > def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) :
> > Iterator[(K,
> > > C)] = {
> > >     val combiners = new JHashMap[K, C]
> > >     for (kv <- iter) {
> > >       val oldC = combiners.get(kv._1)
> > >       if (oldC == null) {
> > >         combiners.put(kv._1, createCombiner(kv._2))
> > >       } else {
> > >         combiners.put(kv._1, mergeValue(oldC, kv._2))
> > >       }
> > >     }
> > >     combiners.iterator
> > >   }
> > >
> > > My doubt is why null is being passed for mergeCombiners closure.
> > >
> > > If two different partitions have same key, wouldn't there be the
> > > requirement to merge them afterwards?
> > >
> > > Thanks,
> > > Archit.
> > >
> >
>

Reply via email to