We are facing some problems where errors are not found with MemPipeline tests which do happen on the cluster due to the way Avro reuses objects for reading; meaning that if you do a parallelDo and a PGroupedTable, then the Iterable of values you get is actually the same object returned to you with different contents. If you then try and store certain instances of this to be emitted later, then you will get unexpected results without taking a copy.
To try and catch these problems on the local side, we've written a wrapper for Iterable<? extends SpecificRecord> which uses the SpecificDatumWriter and SpecificDatumReader to write to and from ByteBuffers for each iteration and offering the last record for reuse. This allows us to run the offending MapFn or DoFn in isolation with a wrapped Iterable as input to identify and fix the problem. This, however, is a little awkward and it would be much neater if this was driven from the Crunch side. At the point in the MapShuffler where the Iterable is wrapped in a SingleUseIterable, it could also be wrapped in one of these AvroReadSimulatingIterable, meaning that people could find these problems before even running them live for the first time. However, this is a problem that is specific to Avro so it obviously makes no sense to hack it right into HFunction. Anyone have any thoughts on how this could be integrated nicely, or indeed if it should be integrated at all?
