*copy a Writable object if you expect to use the value after the next one
is read*

Thanks Matei. How does groupByKey handle this ? Does it make copies of the
value's Writable objects ? Is the HadoopRDD the only one that has this
optimization of reusing Writable objects?

Ameet

On Sat, Aug 10, 2013 at 12:07 AM, Matei Zaharia <[email protected]>wrote:

> What happens is that as we iterate through the SequenceFile, we reuse the
> same IntegerWritable (or other Writable) instances for each record. So the
> rule is *copy a Writable object if you expect to use the value after the
> next one is read*. For example, in take(10), the first element is only
> looked at after you've read all 10 elements, which is too late. (Basically
> you're getting back an array with ten references to the same Writable
> object -- take a look by printing it to stdout). On the other hand, in the
> map() case, you call get() immediately after reading that object, and
> before reading the next one, so it's fine.
>
> This is definitely somewhat confusing but it's just an optimization we
> made because in most cases you use the object right away and don't need to
> allocate another Writable. So as another general rule, just converting the
> object from a Writable to a "normal" Java type if you want to keep it
> around longer is another way. Really it's take() and collect() that will be
> the most confusing.
>
> Matei
>
> On Aug 9, 2013, at 2:47 PM, Ameet Kini <[email protected]> wrote:
>
> >
> >
> > When iterating over a HadoopRDD created using SparkContext.sequenceFile,
> I noticed that if I don't copy the key as below, every tuple in the RDD has
> the same value as the last one seen. Clearly the object is being recycled,
> so if I don't clone the object, I'm in trouble.
> >
> > Say if my sequence files had key of type LongWritable
> >
> > val hadoopRdd = sc.sequenceFile(..)
> > val filteredRdd = hadoopRdd.filter(..)
> >
> > Now if I run the below to print the 10 keys of type Long, I see the same
> value printed 10 times.
> > filteredRdd.take(10).foreach(t => println(t._1.get()))
> >
> > Now if I copy the key out, it prints the 10 unique keys correctly
> > val hadoopRdd = sc.sequenceFile(..)
> > val mappedRdd = hadoopRdd.map(t => (t._1.get(), t._2))
> > val filteredRdd = mappedRdd.filter(..)
> > filteredRdd.take(10).foreach(t => println(t._1))
> >
> > When are users expected to make such copies of objects when performing
> RDD operations?
> >
> > Ameet
>
>

Reply via email to