On Wed, Jun 4, 2014 at 4:53 PM, Josh Wills <[email protected]> wrote: > As long as we're on the subject, the fact that we don't > serialize/deserialize DoFns before we run MemPipelines has burned me a few > times when I would make a change and forget about the serialization > implications until I tried to run the job. One of the things I like about > the local Spark implementation is that it does this serialization check for > you. So if we were to have some mode that could be enabled to allow us to > simulate DoFn serialization and object re-use situations locally, even at > the cost of extra runtime overhead, I would be happy.
Yeah, that sounds like a good plan -- it could probably be done via an overload of MemPipeline.getInstance() that would take a boolean flag to replication MR or not. > > > On Wed, Jun 4, 2014 at 4:27 AM, Gabriel Reid <[email protected]> wrote: > >> Hi David, >> >> Yeah, the whole object reuse situation in MapReduce is quite a drag. >> By the way, this isn't specific to Avro -- it's just as big of an >> issue with Writables. >> >> I'm kind of torn on the idea of building this into MemPipeline. On the >> one hand, the closer the behavior of different pipeline >> implementations are, the better, and I think MemPipeline is currently >> indeed largely used for fast testing of MR workflows. >> >> On the other hand, the MemPipeline can also be seen as an optimized >> implementation for running a pipeline faster if everything fits in >> memory, so adding in extra serialization/deserialization just to be >> similar to the MRPipeline would be unfortunately. Additionally, >> there's also the SparkPipeline implementation to consider -- I assume >> that spark doesn't have this same object reuse situation, although I'm >> not actually sure. >> >> I think that my general feeling is that I'd rather not add object >> reuse into MemPipeline, but I do think it would be great if we could >> bring the behavior of the two implementations in line. Any other >> thoughts on how we could do this? >> >> - Gabriel >> >> >> On Wed, Jun 4, 2014 at 11:51 AM, David Whiting <[email protected]> >> wrote: >> > We are facing some problems where errors are not found with MemPipeline >> > tests which do happen on the cluster due to the way Avro reuses objects >> for >> > reading; meaning that if you do a parallelDo and a PGroupedTable, then >> the >> > Iterable of values you get is actually the same object returned to you >> with >> > different contents. If you then try and store certain instances of this >> to >> > be emitted later, then you will get unexpected results without taking a >> > copy. >> > >> > To try and catch these problems on the local side, we've written a >> wrapper >> > for Iterable<? extends SpecificRecord> which uses the SpecificDatumWriter >> > and SpecificDatumReader to write to and from ByteBuffers for each >> iteration >> > and offering the last record for reuse. This allows us to run the >> offending >> > MapFn or DoFn in isolation with a wrapped Iterable as input to identify >> and >> > fix the problem. >> > >> > This, however, is a little awkward and it would be much neater if this >> was >> > driven from the Crunch side. At the point in the MapShuffler where the >> > Iterable is wrapped in a SingleUseIterable, it could also be wrapped in >> one >> > of these AvroReadSimulatingIterable, meaning that people could find these >> > problems before even running them live for the first time. However, this >> is >> > a problem that is specific to Avro so it obviously makes no sense to hack >> > it right into HFunction. >> > >> > Anyone have any thoughts on how this could be integrated nicely, or >> indeed >> > if it should be integrated at all? >>
