What happens is that as we iterate through the SequenceFile, we reuse the same IntegerWritable (or other Writable) instances for each record. So the rule is *copy a Writable object if you expect to use the value after the next one is read*. For example, in take(10), the first element is only looked at after you've read all 10 elements, which is too late. (Basically you're getting back an array with ten references to the same Writable object -- take a look by printing it to stdout). On the other hand, in the map() case, you call get() immediately after reading that object, and before reading the next one, so it's fine.
This is definitely somewhat confusing but it's just an optimization we made because in most cases you use the object right away and don't need to allocate another Writable. So as another general rule, just converting the object from a Writable to a "normal" Java type if you want to keep it around longer is another way. Really it's take() and collect() that will be the most confusing. Matei On Aug 9, 2013, at 2:47 PM, Ameet Kini <[email protected]> wrote: > > > When iterating over a HadoopRDD created using SparkContext.sequenceFile, I > noticed that if I don't copy the key as below, every tuple in the RDD has the > same value as the last one seen. Clearly the object is being recycled, so if > I don't clone the object, I'm in trouble. > > Say if my sequence files had key of type LongWritable > > val hadoopRdd = sc.sequenceFile(..) > val filteredRdd = hadoopRdd.filter(..) > > Now if I run the below to print the 10 keys of type Long, I see the same > value printed 10 times. > filteredRdd.take(10).foreach(t => println(t._1.get())) > > Now if I copy the key out, it prints the 10 unique keys correctly > val hadoopRdd = sc.sequenceFile(..) > val mappedRdd = hadoopRdd.map(t => (t._1.get(), t._2)) > val filteredRdd = mappedRdd.filter(..) > filteredRdd.take(10).foreach(t => println(t._1)) > > When are users expected to make such copies of objects when performing RDD > operations? > > Ameet
