Thank you for opening the issue and providing the example program to reproduce the problem. Yes, it appears to be a bug in the Runner. We're looking into it.
On Tue, May 31, 2016 at 3:47 PM, Pawel Szczur <[email protected]> wrote: > FYI I've opened an issue: https://issues.apache.org/jira/browse/BEAM-315 > > 2016-05-31 14:51 GMT+02:00 Pawel Szczur <[email protected]>: >> >> I've also added the test for GroupByKey. It fails. It kind of makes Flink >> broken at the moment, isn't it? >> >> I'm wondering.. may it be related to some Windowing issue? >> >> 2016-05-31 14:40 GMT+02:00 Pawel Szczur <[email protected]>: >>> >>> I've just tested it. It fails. >>> >>> Also added the test to the repo: >>> https://github.com/orian/cogroup-wrong-grouping >>> >>> I reason, this means that GroupByKey is flawed? If you open an official >>> issue, please add it to discussion. >>> >>> 2016-05-31 11:55 GMT+02:00 Aljoscha Krettek <[email protected]>: >>>> >>>> Does 2. work for the cases where CoGroupByKey fails? Reason I'm asking >>>> is that CoGroupByKey is essentially implemented like that internally: >>>> create >>>> tagged union -> flatten -> GroupByKey. >>>> >>>> On Tue, 31 May 2016 at 01:16 Pawel Szczur <[email protected]> wrote: >>>>> >>>>> I've naively tried few other key types, it seems to be unrelated to key >>>>> type. >>>>> >>>>> As for now I have two workarounds and ignorance: >>>>> 1. If there is one dominant dataset and other datasets are small (size >>>>> << GB) then I use SideInput. >>>>> 2. If I have multiple datasets of similar size I enclose it in a >>>>> common container, flatten it and GroupByKey. >>>>> 3. I measure occurrences and ignore the bug for now. >>>>> >>>>> Do you have an idea how a test for this may be constructed? It seems >>>>> handy, I think. >>>>> >>>>> I also found two things, maybe they help you: >>>>> 1. issue doesn't appear without parallelism >>>>> 2. issue doesn't appear with a tiny datasets >>>>> >>>>> 2016-05-30 17:13 GMT+02:00 Aljoscha Krettek <[email protected]>: >>>>>> >>>>>> You're right. I'm still looking into this, unfortunately I haven't >>>>>> made progress so far. I'll keep you posted. >>>>>> >>>>>> On Sun, 29 May 2016 at 18:20 Pawel Szczur <[email protected]> >>>>>> wrote: >>>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I used the config as in the repo. >>>>>>> Please grep the the log for "hereGoesLongStringID0,2", you will see >>>>>>> that this key is processed multiple times. >>>>>>> >>>>>>> This is how I understand CoGroupByKey: one has two (or more) >>>>>>> PCollection<KV<K,?>>. Both sets are grouped by key. For each unique key >>>>>>> a >>>>>>> KV<K, CoGbkResult> is produced, a given CoGbkResult contains all values >>>>>>> from >>>>>>> all input PCollections which have the given key. >>>>>>> >>>>>>> But from the log it seems that each key produced more than one >>>>>>> CoGbkResult. >>>>>>> >>>>>>> The final counters didn't catch the bug because in your case, the >>>>>>> value from dataset1 was replicated for each key. >>>>>>> >>>>>>> Cheers, Pawel >>>>>>> >>>>>>> 2016-05-29 15:59 GMT+02:00 Aljoscha Krettek <[email protected]>: >>>>>>>> >>>>>>>> Hi, >>>>>>>> I ran your data generator with these configs: >>>>>>>> p.apply(Create.of(new Config(3, 5, 600_000, 1))) >>>>>>>> .apply(ParDo.of(new Generator())).apply( >>>>>>>> >>>>>>>> AvroIO.Write.to("/tmp/dataset1").withSchema(DumbData.class).withNumShards(6)); >>>>>>>> >>>>>>>> p.apply(Create.of(new Config(3, 5, 600_000, 2))). >>>>>>>> apply(ParDo.of(new Generator())).apply( >>>>>>>> >>>>>>>> AvroIO.Write.to("/tmp/dataset2").withSchema(DumbData.class).withNumShards(6)); >>>>>>>> >>>>>>>> Then I ran the job with parallelism=6. I couldn't reproduce the >>>>>>>> problem, this is the log file from one of several runs: >>>>>>>> https://gist.github.com/aljoscha/ef1d804f57671cd472c75b92b4aee51b >>>>>>>> >>>>>>>> Could you please send me the exact config that you used. Btw, I ran >>>>>>>> it inside an IDE, do the problems also occur in the IDE for you or >>>>>>>> only when >>>>>>>> you execute on a cluster? >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Aljoscha >>>>>>>> >>>>>>>> On Sun, 29 May 2016 at 01:51 Pawel Szczur <[email protected]> >>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Hi Aljoscha. >>>>>>>>> >>>>>>>>> I've created a repo with fake dataset to allow easily reproduce the >>>>>>>>> problem: >>>>>>>>> https://github.com/orian/cogroup-wrong-grouping >>>>>>>>> >>>>>>>>> What I noticed: if the dataset is too small the bug doesn't appear. >>>>>>>>> >>>>>>>>> You can modify the size of dataset, but in ideal case it should be >>>>>>>>> few hundred thousands records per key (I guess it depends on the >>>>>>>>> machine you >>>>>>>>> run it). >>>>>>>>> >>>>>>>>> Cheers, Pawel >>>>>>>>> >>>>>>>>> 2016-05-28 12:45 GMT+02:00 Aljoscha Krettek <[email protected]>: >>>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> which version of Beam/Flink are you using. >>>>>>>>>> >>>>>>>>>> Could you maybe also provide example data and code that showcases >>>>>>>>>> the problem? If you have concerns about sending it to a public list >>>>>>>>>> you can >>>>>>>>>> also send it to me directly. >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Aljoscha >>>>>>>>>> >>>>>>>>>> On Fri, 27 May 2016 at 20:53 Pawel Szczur <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Data description. >>>>>>>>>>> >>>>>>>>>>> I have two datasets. >>>>>>>>>>> >>>>>>>>>>> Records - the first, containes around 0.5-1M of records per >>>>>>>>>>> (key,day). For testing I use 2-3 keys and 5-10 days of data. What I >>>>>>>>>>> shoot >>>>>>>>>>> for is 1000+ keys. Each record contains key, timestamp in μ-seconds >>>>>>>>>>> and some >>>>>>>>>>> other data. >>>>>>>>>>> Configs - the second, is rather small. It describes the key in >>>>>>>>>>> time, e.g. you can think about it as a list of tuples: (key, start >>>>>>>>>>> date, end >>>>>>>>>>> date, description). >>>>>>>>>>> >>>>>>>>>>> For the exploration I've encoded the data as files of >>>>>>>>>>> length-prefixed Protocol Buffer binary encoded messages. >>>>>>>>>>> Additionally the >>>>>>>>>>> files are packed with gzip. Data is sharded by date. Each file is >>>>>>>>>>> around >>>>>>>>>>> 10MB. >>>>>>>>>>> >>>>>>>>>>> Pipeline >>>>>>>>>>> >>>>>>>>>>> First I add keys to both datasets. For Records dataset it's (key, >>>>>>>>>>> day rounded timestamp). For Configs a key is (key, day), where day >>>>>>>>>>> is each >>>>>>>>>>> timestamp value between start date and end date (pointing midnight). >>>>>>>>>>> The datasets are merged using CoGroupByKey. >>>>>>>>>>> >>>>>>>>>>> As a key type I use import org.apache.flink.api.java.tuple.Tuple2 >>>>>>>>>>> with a Tuple2Coder from this repo. >>>>>>>>>>> >>>>>>>>>>> The problem >>>>>>>>>>> >>>>>>>>>>> If the Records dataset is tiny like 5 days, everything seems fine >>>>>>>>>>> (check normal_run.log). >>>>>>>>>>> >>>>>>>>>>> INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator >>>>>>>>>>> values: >>>>>>>>>>> INFO [main] (FlinkPipelineRunner.java:127) - item count : >>>>>>>>>>> 4322332 >>>>>>>>>>> INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 0 >>>>>>>>>>> INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0 >>>>>>>>>>> >>>>>>>>>>> When I run the pipeline against 10+ days I encounter an error >>>>>>>>>>> pointing that for some Records there's no Config (wrong_run.log). >>>>>>>>>>> >>>>>>>>>>> INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator >>>>>>>>>>> values: >>>>>>>>>>> INFO [main] (FlinkPipelineRunner.java:127) - item count : >>>>>>>>>>> 8577197 >>>>>>>>>>> INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 6 >>>>>>>>>>> INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0 >>>>>>>>>>> >>>>>>>>>>> Then I've added some extra logging messages: >>>>>>>>>>> >>>>>>>>>>> (ConvertToItem.java:144) - 68643 items for KeyValue3 on: >>>>>>>>>>> 1462665600000000 >>>>>>>>>>> (ConvertToItem.java:140) - no items for KeyValue3 on: >>>>>>>>>>> 1463184000000000 >>>>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on: >>>>>>>>>>> 1462924800000000 >>>>>>>>>>> (ConvertToItem.java:142) - 753707 items for KeyValue3 on: >>>>>>>>>>> 1462924800000000 marked as no-loc >>>>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on: >>>>>>>>>>> 1462752000000000 >>>>>>>>>>> (ConvertToItem.java:142) - 749901 items for KeyValue3 on: >>>>>>>>>>> 1462752000000000 marked as no-loc >>>>>>>>>>> (ConvertToItem.java:144) - 754578 items for KeyValue3 on: >>>>>>>>>>> 1462406400000000 >>>>>>>>>>> (ConvertToItem.java:144) - 751574 items for KeyValue3 on: >>>>>>>>>>> 1463011200000000 >>>>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on: >>>>>>>>>>> 1462665600000000 >>>>>>>>>>> (ConvertToItem.java:142) - 754758 items for KeyValue3 on: >>>>>>>>>>> 1462665600000000 marked as no-loc >>>>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on: >>>>>>>>>>> 1463184000000000 >>>>>>>>>>> (ConvertToItem.java:142) - 694372 items for KeyValue3 on: >>>>>>>>>>> 1463184000000000 marked as no-loc >>>>>>>>>>> >>>>>>>>>>> You can spot that in first line 68643 items were processed for >>>>>>>>>>> KeyValue3 and time 1462665600000000. >>>>>>>>>>> Later on in line 9 it seems the operation processes the same key >>>>>>>>>>> again, but it reports that no Config was available for these >>>>>>>>>>> Records. >>>>>>>>>>> The line 10 informs they've been marked as no-loc. >>>>>>>>>>> >>>>>>>>>>> The line 2 is saying that there were no items for KeyValue3 and >>>>>>>>>>> time 1463184000000000, but in line 11 you can read that the items >>>>>>>>>>> for this >>>>>>>>>>> (key,day) pair were processed later and they've lacked a Config. >>>>>>>>>>> >>>>>>>>>>> Work-around (after more testing, doesn't work, staying with >>>>>>>>>>> Tuple2) >>>>>>>>>>> >>>>>>>>>>> I've switched from using Tuple2 to a Protocol Buffer message: >>>>>>>>>>> >>>>>>>>>>> message KeyDay { >>>>>>>>>>> optional ByteString key = 1; >>>>>>>>>>> optional int64 timestamp_usec = 2; >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> But using Tuple2.of() was just easier than: >>>>>>>>>>> KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build(). >>>>>>>>>>> >>>>>>>>>>> // The original description comes from: >>>>>>>>>>> http://stackoverflow.com/questions/37473682/items-not-groupped-correctly-cogroupbykey >>>>>>>>> >>>>>>>>> >>>>>>> >>>>> >>> >> >
