*copy a Writable object if you expect to use the value after the next one is read*
Thanks Matei. How does groupByKey handle this ? Does it make copies of the value's Writable objects ? Is the HadoopRDD the only one that has this optimization of reusing Writable objects? Ameet On Sat, Aug 10, 2013 at 12:07 AM, Matei Zaharia <[email protected]>wrote: > What happens is that as we iterate through the SequenceFile, we reuse the > same IntegerWritable (or other Writable) instances for each record. So the > rule is *copy a Writable object if you expect to use the value after the > next one is read*. For example, in take(10), the first element is only > looked at after you've read all 10 elements, which is too late. (Basically > you're getting back an array with ten references to the same Writable > object -- take a look by printing it to stdout). On the other hand, in the > map() case, you call get() immediately after reading that object, and > before reading the next one, so it's fine. > > This is definitely somewhat confusing but it's just an optimization we > made because in most cases you use the object right away and don't need to > allocate another Writable. So as another general rule, just converting the > object from a Writable to a "normal" Java type if you want to keep it > around longer is another way. Really it's take() and collect() that will be > the most confusing. > > Matei > > On Aug 9, 2013, at 2:47 PM, Ameet Kini <[email protected]> wrote: > > > > > > > When iterating over a HadoopRDD created using SparkContext.sequenceFile, > I noticed that if I don't copy the key as below, every tuple in the RDD has > the same value as the last one seen. Clearly the object is being recycled, > so if I don't clone the object, I'm in trouble. > > > > Say if my sequence files had key of type LongWritable > > > > val hadoopRdd = sc.sequenceFile(..) > > val filteredRdd = hadoopRdd.filter(..) > > > > Now if I run the below to print the 10 keys of type Long, I see the same > value printed 10 times. > > filteredRdd.take(10).foreach(t => println(t._1.get())) > > > > Now if I copy the key out, it prints the 10 unique keys correctly > > val hadoopRdd = sc.sequenceFile(..) > > val mappedRdd = hadoopRdd.map(t => (t._1.get(), t._2)) > > val filteredRdd = mappedRdd.filter(..) > > filteredRdd.take(10).foreach(t => println(t._1)) > > > > When are users expected to make such copies of objects when performing > RDD operations? > > > > Ameet > >
