I've just tested it. It fails. Also added the test to the repo: https://github.com/orian/cogroup-wrong-grouping
I reason, this means that GroupByKey is flawed? If you open an official issue, please add it to discussion. 2016-05-31 11:55 GMT+02:00 Aljoscha Krettek <[email protected]>: > Does 2. work for the cases where CoGroupByKey fails? Reason I'm asking is > that CoGroupByKey is essentially implemented like that internally: create > tagged union -> flatten -> GroupByKey. > > On Tue, 31 May 2016 at 01:16 Pawel Szczur <[email protected]> wrote: > >> I've naively tried few other key types, it seems to be unrelated to key >> type. >> >> As for now I have two workarounds and ignorance: >> 1. If there is one dominant dataset and other datasets are small (size >> << GB) then I use SideInput. >> 2. If I have multiple datasets of similar size I enclose it in a common >> container, flatten it and GroupByKey. >> 3. I measure occurrences and ignore the bug for now. >> >> Do you have an idea how a test for this may be constructed? It seems >> handy, I think. >> >> I also found two things, maybe they help you: >> 1. issue doesn't appear without parallelism >> 2. issue doesn't appear with a tiny datasets >> >> 2016-05-30 17:13 GMT+02:00 Aljoscha Krettek <[email protected]>: >> >>> You're right. I'm still looking into this, unfortunately I haven't made >>> progress so far. I'll keep you posted. >>> >>> On Sun, 29 May 2016 at 18:20 Pawel Szczur <[email protected]> wrote: >>> >>>> Hi, >>>> >>>> I used the config as in the repo. >>>> Please grep the the log for "hereGoesLongStringID0,2", you will see >>>> that this key is processed multiple times. >>>> >>>> This is how I understand CoGroupByKey: one has two (or more) >>>> PCollection<KV<K,?>>. Both sets are grouped by key. For each unique key a >>>> KV<K, CoGbkResult> is produced, a given CoGbkResult contains all values >>>> from all input PCollections which have the given key. >>>> >>>> But from the log it seems that each key produced more than one >>>> CoGbkResult. >>>> >>>> The final counters didn't catch the bug because in your case, the value >>>> from dataset1 was replicated for each key. >>>> >>>> Cheers, Pawel >>>> >>>> 2016-05-29 15:59 GMT+02:00 Aljoscha Krettek <[email protected]>: >>>> >>>>> Hi, >>>>> I ran your data generator with these configs: >>>>> p.apply(Create.of(new Config(3, 5, 600_000, 1))) >>>>> .apply(ParDo.of(new Generator())).apply( >>>>> AvroIO.Write.to >>>>> ("/tmp/dataset1").withSchema(DumbData.class).withNumShards(6)); >>>>> >>>>> p.apply(Create.of(new Config(3, 5, 600_000, 2))). >>>>> apply(ParDo.of(new Generator())).apply( >>>>> AvroIO.Write.to >>>>> ("/tmp/dataset2").withSchema(DumbData.class).withNumShards(6)); >>>>> >>>>> Then I ran the job with parallelism=6. I couldn't reproduce the >>>>> problem, this is the log file from one of several runs: >>>>> https://gist.github.com/aljoscha/ef1d804f57671cd472c75b92b4aee51b >>>>> >>>>> Could you please send me the exact config that you used. Btw, I ran it >>>>> inside an IDE, do the problems also occur in the IDE for you or only when >>>>> you execute on a cluster? >>>>> >>>>> Cheers, >>>>> Aljoscha >>>>> >>>>> On Sun, 29 May 2016 at 01:51 Pawel Szczur <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Aljoscha. >>>>>> >>>>>> I've created a repo with fake dataset to allow easily reproduce the >>>>>> problem: >>>>>> https://github.com/orian/cogroup-wrong-grouping >>>>>> >>>>>> What I noticed: if the dataset is too small the bug doesn't appear. >>>>>> >>>>>> You can modify the size of dataset, but in ideal case it should be >>>>>> few hundred thousands records per key (I guess it depends on the machine >>>>>> you run it). >>>>>> >>>>>> Cheers, Pawel >>>>>> >>>>>> 2016-05-28 12:45 GMT+02:00 Aljoscha Krettek <[email protected]>: >>>>>> >>>>>>> Hi, >>>>>>> which version of Beam/Flink are you using. >>>>>>> >>>>>>> Could you maybe also provide example data and code that showcases >>>>>>> the problem? If you have concerns about sending it to a public list you >>>>>>> can >>>>>>> also send it to me directly. >>>>>>> >>>>>>> Cheers, >>>>>>> Aljoscha >>>>>>> >>>>>>> On Fri, 27 May 2016 at 20:53 Pawel Szczur <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> *Data description.* >>>>>>>> >>>>>>>> I have two datasets. >>>>>>>> >>>>>>>> Records - the first, containes around 0.5-1M of records per >>>>>>>> (key,day). For testing I use 2-3 keys and 5-10 days of data. What I >>>>>>>> shoot >>>>>>>> for is 1000+ keys. Each record contains key, timestamp in μ-seconds and >>>>>>>> some other data. >>>>>>>> Configs - the second, is rather small. It describes the key in >>>>>>>> time, e.g. you can think about it as a list of tuples: (key, start >>>>>>>> date, >>>>>>>> end date, description). >>>>>>>> >>>>>>>> For the exploration I've encoded the data as files of >>>>>>>> length-prefixed Protocol Buffer binary encoded messages. Additionally >>>>>>>> the >>>>>>>> files are packed with gzip. Data is sharded by date. Each file is >>>>>>>> around >>>>>>>> 10MB. >>>>>>>> >>>>>>>> *Pipeline* >>>>>>>> >>>>>>>> First I add keys to both datasets. For Records dataset it's (key, >>>>>>>> day rounded timestamp). For Configs a key is (key, day), where day is >>>>>>>> each >>>>>>>> timestamp value between start date and end date (pointing midnight). >>>>>>>> The datasets are merged using CoGroupByKey. >>>>>>>> >>>>>>>> As a key type I use import org.apache.flink.api.java.tuple.Tuple2 >>>>>>>> with a Tuple2Coder from this repo. >>>>>>>> >>>>>>>> *The problem* >>>>>>>> >>>>>>>> If the Records dataset is tiny like 5 days, everything seems fine >>>>>>>> (check normal_run.log). >>>>>>>> >>>>>>>> INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator >>>>>>>> values: >>>>>>>> INFO [main] (FlinkPipelineRunner.java:127) - item count : 4322332 >>>>>>>> INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 0 >>>>>>>> INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0 >>>>>>>> >>>>>>>> When I run the pipeline against 10+ days I encounter an error >>>>>>>> pointing that for some Records there's no Config (wrong_run.log). >>>>>>>> >>>>>>>> INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator >>>>>>>> values: >>>>>>>> INFO [main] (FlinkPipelineRunner.java:127) - item count : 8577197 >>>>>>>> INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 6 >>>>>>>> INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0 >>>>>>>> >>>>>>>> Then I've added some extra logging messages: >>>>>>>> >>>>>>>> (ConvertToItem.java:144) - 68643 items for KeyValue3 on: >>>>>>>> 1462665600000000 >>>>>>>> (ConvertToItem.java:140) - no items for KeyValue3 on: >>>>>>>> 1463184000000000 >>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on: >>>>>>>> 1462924800000000 >>>>>>>> (ConvertToItem.java:142) - 753707 items for KeyValue3 on: >>>>>>>> 1462924800000000 marked as no-loc >>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on: >>>>>>>> 1462752000000000 >>>>>>>> (ConvertToItem.java:142) - 749901 items for KeyValue3 on: >>>>>>>> 1462752000000000 marked as no-loc >>>>>>>> (ConvertToItem.java:144) - 754578 items for KeyValue3 on: >>>>>>>> 1462406400000000 >>>>>>>> (ConvertToItem.java:144) - 751574 items for KeyValue3 on: >>>>>>>> 1463011200000000 >>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on: >>>>>>>> 1462665600000000 >>>>>>>> (ConvertToItem.java:142) - 754758 items for KeyValue3 on: >>>>>>>> 1462665600000000 marked as no-loc >>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on: >>>>>>>> 1463184000000000 >>>>>>>> (ConvertToItem.java:142) - 694372 items for KeyValue3 on: >>>>>>>> 1463184000000000 marked as no-loc >>>>>>>> >>>>>>>> You can spot that in first line 68643 items were processed for >>>>>>>> KeyValue3 and time 1462665600000000. >>>>>>>> Later on in line 9 it seems the operation processes the same key >>>>>>>> again, but it reports that no Config was available for these Records. >>>>>>>> The line 10 informs they've been marked as no-loc. >>>>>>>> >>>>>>>> The line 2 is saying that there were no items for KeyValue3 and >>>>>>>> time 1463184000000000, but in line 11 you can read that the items for >>>>>>>> this >>>>>>>> (key,day) pair were processed later and they've lacked a Config. >>>>>>>> >>>>>>>> *Work-around (after more testing, doesn't work, staying with >>>>>>>> Tuple2)* >>>>>>>> >>>>>>>> I've switched from using Tuple2 to a Protocol Buffer message: >>>>>>>> >>>>>>>> message KeyDay { >>>>>>>> optional ByteString key = 1; >>>>>>>> optional int64 timestamp_usec = 2; >>>>>>>> } >>>>>>>> >>>>>>>> But using Tuple2.of() was just easier than: >>>>>>>> KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build(). >>>>>>>> >>>>>>>> // The original description comes from: >>>>>>>> http://stackoverflow.com/questions/37473682/items-not-groupped-correctly-cogroupbykey >>>>>>>> >>>>>>> >>>>>> >>>> >>
