Yes, I see in HadoopRDD where the record reader is being fed the key,value to read next and how it is not allocated on each read. Might be worthwhile updating the "Hadoop Datasets" section of the programming guide with a one-liner reflecting this requirement from the user.
https://github.com/mesos/spark/wiki/Spark-Programming-Guide Thanks, Ameet On Sat, Aug 10, 2013 at 9:52 PM, Matei Zaharia <[email protected]>wrote: > Yes, HadoopRDD is the only place where this matters, unless you pass the > Writables out of there. For our groupBy/reduce operations, we don't use > Hadoop Writable serialization. In fact you might note that you can group > and reduce any kind of object in Spark, not just subclasses of Writable. > > Matei > > On Aug 10, 2013, at 6:20 PM, Ameet Kini <[email protected]> wrote: > > > *copy a Writable object if you expect to use the value after the next one > is read* > > Thanks Matei. How does groupByKey handle this ? Does it make copies of the > value's Writable objects ? Is the HadoopRDD the only one that has this > optimization of reusing Writable objects? > > Ameet > > On Sat, Aug 10, 2013 at 12:07 AM, Matei Zaharia > <[email protected]>wrote: > >> What happens is that as we iterate through the SequenceFile, we reuse the >> same IntegerWritable (or other Writable) instances for each record. So the >> rule is *copy a Writable object if you expect to use the value after the >> next one is read*. For example, in take(10), the first element is only >> looked at after you've read all 10 elements, which is too late. (Basically >> you're getting back an array with ten references to the same Writable >> object -- take a look by printing it to stdout). On the other hand, in the >> map() case, you call get() immediately after reading that object, and >> before reading the next one, so it's fine. >> >> This is definitely somewhat confusing but it's just an optimization we >> made because in most cases you use the object right away and don't need to >> allocate another Writable. So as another general rule, just converting the >> object from a Writable to a "normal" Java type if you want to keep it >> around longer is another way. Really it's take() and collect() that will be >> the most confusing. >> >> Matei >> >> On Aug 9, 2013, at 2:47 PM, Ameet Kini <[email protected]> wrote: >> >> > >> > >> > When iterating over a HadoopRDD created using >> SparkContext.sequenceFile, I noticed that if I don't copy the key as below, >> every tuple in the RDD has the same value as the last one seen. Clearly the >> object is being recycled, so if I don't clone the object, I'm in trouble. >> > >> > Say if my sequence files had key of type LongWritable >> > >> > val hadoopRdd = sc.sequenceFile(..) >> > val filteredRdd = hadoopRdd.filter(..) >> > >> > Now if I run the below to print the 10 keys of type Long, I see the >> same value printed 10 times. >> > filteredRdd.take(10).foreach(t => println(t._1.get())) >> > >> > Now if I copy the key out, it prints the 10 unique keys correctly >> > val hadoopRdd = sc.sequenceFile(..) >> > val mappedRdd = hadoopRdd.map(t => (t._1.get(), t._2)) >> > val filteredRdd = mappedRdd.filter(..) >> > filteredRdd.take(10).foreach(t => println(t._1)) >> > >> > When are users expected to make such copies of objects when performing >> RDD operations? >> > >> > Ameet >> >> > >
