To summarize: - When we saw data duplication, that was what we should have been expecting, given our implementation. That is not the issue. - Sometimes we didn't see data duplication. That is an issue: *Union sometimes ignores one of the input branches.*
I created https://issues.apache.org/jira/browse/CRUNCH-163 for this issue. The tests singleUnion and doubleUnionWithoutMaterializeInbetween pass in my environment (0.4), the others fail. Besides breaking a union by adding a materialize after it I could also break it by performing a parallelDo after it or by just joining two read pCollections. Cheers, Tim On Tue, Feb 5, 2013 at 3:38 PM, Tim van Heugten <[email protected]> wrote: > Hmmm, > > So we had a mistake in our code that emitted the data in both branches > before union2. > *And*, the crunch union also *failed to merge the data* in some > circumstance. My side-remark about not seeing the join happen was actually > bang on.. :-/ > > So the question now becomes, when does a union ignore one of its incoming > branches? > Apparently with materialization in the right spots we can force the > correct pipeline(*). > > Cheers, > > Tim van Heugten > > > *) Thereby exposing our bug, seemingly data duplication. But just to be > clear, this is actually the *correct* behavior. > > > > On Tue, Feb 5, 2013 at 3:18 PM, Tim van Heugten <[email protected]> wrote: > >> Hi, >> >> It turns out the data in the two branches that are unioned in union2 is >> not mutually exclusive (counter to what I was expecting). Probably we >> should expect data duplication. >> >> However, this does still not explain why sometimes we find data >> duplication and sometimes we don't. >> >> Will keep you posted, >> >> Tim >> >> >> On Tue, Feb 5, 2013 at 11:32 AM, Tim van Heugten <[email protected]>wrote: >> >>> Hi Gabriel, >>> >>> I've been unsuccessful so far to reproduce the issue in a controlled >>> environment. As said, its fragile, maybe the types involved play a role, so >>> when I tried to simplify those I broke the failure condition. >>> I decide it's time to try providing more information without giving an >>> explicit example. >>> >>> The pipeline we build is illustrated here: http://yuml.me/8ef99512. >>> Depending on where we materialize the data occurs twice in UP. >>> The EITPI job filters the exact opposite of the filter branch. In PWR >>> only data from EITPI is passed through, while the PITP data is used to >>> modify it. >>> Below you find the job names as executed when dataduplication occurs, >>> materializations occur before BTO(*) and after UP. >>> >>> "Avro(target/stored/sIPhase)+EITPI+GBK+PITEI+Avro(/tmp/crunch655004156/p4)" >>> >>> "[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch655004156/p4)]]+GBK+PWR+UnionCollectionWrapper+Avro(/tmp/crunch655004156/p2)" >>> >>> "[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch655004156/p4)]]+GBK+PWR+BTO+Avro(/tmp/crunch655004156/p8)" >>> >>> "[[Avro(target/stored/sIPhase)+S0+BTO]/[Avro(/tmp/crunch655004156/p8)]]+GBK+UP+Avro(/tmp/crunch655004156/p6)" >>> "Avro(/tmp/crunch655004156/p6)+GetData+Avro(/tmp/crunch655004156/p10)" >>> "Avro(/tmp/crunch655004156/p6)+GetTraces+Avro(target/trace-dump/traces)" >>> >>> Here are the jobs performed when materialization is added between BTO >>> and gbk: >>> >>> "Avro(target/stored/sIPhase)+EITPI+GBK+PITEI+Avro(/tmp/crunch-551174870/p4)" >>> >>> "[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch-551174870/p4)]]+GBK+PWR+UnionCollectionWrapper+Avro(/tmp/crunch-551174870/p2)" >>> >>> "[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch-551174870/p4)]]+GBK+PWR+BTO+Avro(/tmp/crunch-551174870/p6)" >>> "Avro(/tmp/crunch-551174870/p6)+GBK+UP+Avro(/tmp/crunch-551174870/p8)" >>> "Avro(/tmp/crunch-551174870/p8)+GetData+Avro(/tmp/crunch-551174870/p10)" >>> "Avro(/tmp/crunch-551174870/p8)+GetTraces+Avro(target/trace-dump/traces)" >>> >>> Without changing changing anything else, the added materialization fixes >>> the issue of data duplication. >>> >>> If you have any clues how I can extract a clean working example I'm >>> happy to hear. >>> >>> >>> *) This materialization probably explains the second job, however, where >>> the filtered data is joined is lost on me. This is not the cause though, >>> with just one materialize at the end, after UP, the data count still >>> doubled. The jobs then look like this: >>> >>> "Avro(target/stored/sIPhase)+EITPI+GBK+PITEI+Avro(/tmp/crunch369510677/p4)" >>> >>> "[[Avro(target/stored/sIPhase)+PITP]/[Avro(/tmp/crunch369510677/p4)]]+GBK+PWR+BTO+Avro(/tmp/crunch369510677/p6)" >>> >>> "[[Avro(target/stored/sIPhase)+S0+BTO]/[Avro(/tmp/crunch369510677/p6)]]+GBK+UP+Avro(/tmp/crunch369510677/p2)" >>> "Avro(/tmp/crunch369510677/p2)+GetTraces+Avro(target/trace-dump/traces)" >>> "Avro(/tmp/crunch369510677/p2)+GetData+Avro(/tmp/crunch369510677/p8)" >>> >>> BR, >>> >>> Tim van Heugten >>> >>> >>> On Thu, Jan 31, 2013 at 9:27 PM, Gabriel Reid <[email protected]>wrote: >>> >>>> Hi Tim, >>>> >>>> On 31 Jan 2013, at 10:45, Tim van Heugten <[email protected]> wrote: >>>> >>>> > Hi Gabriel, >>>> > >>>> > For the most part it is similar to what was send around recently on >>>> this mailinglist, see: >>>> > From Dave Beech <[email protected]> >>>> > Subject Question about mapreduce job planner >>>> > Date Tue, 15 Jan 2013 11:41:42 GMT >>>> > >>>> > So, the common path before multiple outputs branch is executed twice. >>>> Sometimes the issues seem related to unions though, i.e. multiple inputs. >>>> We seem to have been troubled by a grouped table parallelDo on a >>>> table-union-gbk that got its data twice (all grouped doubled in size). >>>> Inserting a materialize between the union and groupByKey solved the issue. >>>> > >>>> > These issues seem very fragile (so they're fixed easily by changing >>>> something that's irrelevant to the output), so usually we just add or >>>> remove a materialization to make it run again. >>>> > I'll see if I can cleanly reproduce the data duplication issue later >>>> this week. >>>> >>>> Ok, that would be great if you could replicate it in a small test, >>>> thanks! >>>> >>>> - Gabriel >>> >>> >>> >> >
