Recording the outcome here for the record. Based on Sean’s advice I’ve confirmed that making defensive copies of records that will be collected avoids this problem - it does seem like Avro is being a bit too aggressive when deciding it’s safe to reuse an object for a new record.
On 18 December 2014 at 21:50, Sean Owen <so...@cloudera.com> wrote: > > Being mutable is fine; reusing and mutating the objects is the issue. > And yes the objects you get back from Hadoop are reused by Hadoop > InputFormats. You should just map the objects to a clone before using > them where you need them to exist all independently at once, like > before a collect(). > > (That said... generally speaking collect() involves copying from > workers to the driver, which necessarily means a copy anyway. I > suspect this isn't working that way for you since you're running it > all locally?) > > On Thu, Dec 18, 2014 at 10:42 AM, Tristan Blakers <tris...@blackfrog.org> > wrote: > > Suspected the same thing, but because the underlying data classes are > > deserialised by Avro I think they have to be mutable as you need to > provide > > the no-args constructor with settable fields. > > > > Nothing is being cached in my code anywhere, and this can be reproduced > > using data directly out of the newAPIHadoopRDD() call. Debugs added to > the > > constructors of the various classes show that the right number are being > > constructed, though the watches set on some of the fields aren’t always > > triggering, so suspect maybe the serialisation is doing something a bit > too > > clever? > > > > Tristan > > > > On 18 December 2014 at 21:25, Sean Owen <so...@cloudera.com> wrote: > >> > >> It sounds a lot like your values are mutable classes and you are > >> mutating or reusing them somewhere? It might work until you actually > >> try to materialize them all and find many point to the same object. > >> > >> On Thu, Dec 18, 2014 at 10:06 AM, Tristan Blakers < > tris...@blackfrog.org> > >> wrote: > >> > Hi, > >> > > >> > I’m getting some seemingly invalid results when I collect an RDD. This > >> > is > >> > happening in both Spark 1.1.0 and 1.2.0, using Java8 on Mac. > >> > > >> > See the following code snippet: > >> > > >> > JavaRDD<Thing> rdd= pairRDD.values(); > >> > rdd.foreach( e -> System.out.println ( "RDD Foreach: " + e ) ); > >> > rdd.collect().forEach( e -> System.out.println ( "Collected Foreach: > " + > >> > e ) > >> > ); > >> > > >> > I would expect the results from the two outputters to be identical, > but > >> > instead I see: > >> > > >> > RDD Foreach: Thing1 > >> > RDD Foreach: Thing2 > >> > RDD Foreach: Thing3 > >> > RDD Foreach: Thing4 > >> > (…snip…) > >> > Collected Foreach: Thing1 > >> > Collected Foreach: Thing1 > >> > Collected Foreach: Thing1 > >> > Collected Foreach: Thing2 > >> > > >> > So essentially the valid entries except for one are replaced by an > >> > equivalent number of duplicate objects. I’ve tried various map and > >> > filter > >> > operations, but the results in the RDD always appear correct until I > try > >> > to > >> > collect() the results. I’ve also found that calling cache() on the RDD > >> > materialises the duplication process such that the RDD Foreach > displays > >> > the > >> > duplicates too... > >> > > >> > Any suggestions for how I can go about debugging this would be > massively > >> > appreciated. > >> > > >> > Cheers > >> > Tristan >