Thank you for opening the issue and providing the example program to
reproduce the problem. Yes, it appears to be a bug in the Runner.
We're looking into it.

On Tue, May 31, 2016 at 3:47 PM, Pawel Szczur <[email protected]> wrote:
> FYI I've opened an issue: https://issues.apache.org/jira/browse/BEAM-315
>
> 2016-05-31 14:51 GMT+02:00 Pawel Szczur <[email protected]>:
>>
>> I've also added the test for GroupByKey. It fails. It kind of makes Flink
>> broken at the moment, isn't it?
>>
>> I'm wondering.. may it be related to some Windowing issue?
>>
>> 2016-05-31 14:40 GMT+02:00 Pawel Szczur <[email protected]>:
>>>
>>> I've just tested it. It fails.
>>>
>>> Also added the test to the repo:
>>> https://github.com/orian/cogroup-wrong-grouping
>>>
>>> I reason, this means that GroupByKey is flawed? If you open an official
>>> issue, please add it to discussion.
>>>
>>> 2016-05-31 11:55 GMT+02:00 Aljoscha Krettek <[email protected]>:
>>>>
>>>> Does 2. work for the cases where CoGroupByKey fails? Reason I'm asking
>>>> is that CoGroupByKey is essentially implemented like that internally: 
>>>> create
>>>> tagged union -> flatten -> GroupByKey.
>>>>
>>>> On Tue, 31 May 2016 at 01:16 Pawel Szczur <[email protected]> wrote:
>>>>>
>>>>> I've naively tried few other key types, it seems to be unrelated to key
>>>>> type.
>>>>>
>>>>> As for now I have two workarounds and ignorance:
>>>>>  1. If there is one dominant dataset and other datasets are small (size
>>>>> << GB) then I use SideInput.
>>>>>  2. If I have multiple datasets of similar size I enclose it in a
>>>>> common container, flatten it and GroupByKey.
>>>>>  3. I measure occurrences and ignore the bug for now.
>>>>>
>>>>> Do you have an idea how a test for this may be constructed? It seems
>>>>> handy, I think.
>>>>>
>>>>> I also found two things, maybe they help you:
>>>>>  1. issue doesn't appear without parallelism
>>>>>  2. issue doesn't appear with a tiny datasets
>>>>>
>>>>> 2016-05-30 17:13 GMT+02:00 Aljoscha Krettek <[email protected]>:
>>>>>>
>>>>>> You're right. I'm still looking into this, unfortunately I haven't
>>>>>> made progress so far. I'll keep you posted.
>>>>>>
>>>>>> On Sun, 29 May 2016 at 18:20 Pawel Szczur <[email protected]>
>>>>>> wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I used the config as in the repo.
>>>>>>> Please grep the the log for "hereGoesLongStringID0,2", you will see
>>>>>>> that this key is processed multiple times.
>>>>>>>
>>>>>>> This is how I understand CoGroupByKey: one has two (or more)
>>>>>>> PCollection<KV<K,?>>. Both sets are grouped by key. For each unique key 
>>>>>>> a
>>>>>>> KV<K, CoGbkResult> is produced, a given CoGbkResult contains all values 
>>>>>>> from
>>>>>>> all input PCollections which have the given key.
>>>>>>>
>>>>>>> But from the log it seems that each key produced more than one
>>>>>>> CoGbkResult.
>>>>>>>
>>>>>>> The final counters didn't catch the bug because in your case, the
>>>>>>> value from dataset1 was replicated for each key.
>>>>>>>
>>>>>>> Cheers, Pawel
>>>>>>>
>>>>>>> 2016-05-29 15:59 GMT+02:00 Aljoscha Krettek <[email protected]>:
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>> I ran your data generator with these configs:
>>>>>>>> p.apply(Create.of(new Config(3, 5, 600_000, 1)))
>>>>>>>>     .apply(ParDo.of(new Generator())).apply(
>>>>>>>>
>>>>>>>> AvroIO.Write.to("/tmp/dataset1").withSchema(DumbData.class).withNumShards(6));
>>>>>>>>
>>>>>>>> p.apply(Create.of(new Config(3, 5, 600_000, 2))).
>>>>>>>>     apply(ParDo.of(new Generator())).apply(
>>>>>>>>
>>>>>>>> AvroIO.Write.to("/tmp/dataset2").withSchema(DumbData.class).withNumShards(6));
>>>>>>>>
>>>>>>>> Then I ran the job with parallelism=6. I couldn't reproduce the
>>>>>>>> problem, this is the log file from one of several runs:
>>>>>>>> https://gist.github.com/aljoscha/ef1d804f57671cd472c75b92b4aee51b
>>>>>>>>
>>>>>>>> Could you please send me the exact config that you used. Btw, I ran
>>>>>>>> it inside an IDE, do the problems also occur in the IDE for you or 
>>>>>>>> only when
>>>>>>>> you execute on a cluster?
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Aljoscha
>>>>>>>>
>>>>>>>> On Sun, 29 May 2016 at 01:51 Pawel Szczur <[email protected]>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hi Aljoscha.
>>>>>>>>>
>>>>>>>>> I've created a repo with fake dataset to allow easily reproduce the
>>>>>>>>> problem:
>>>>>>>>> https://github.com/orian/cogroup-wrong-grouping
>>>>>>>>>
>>>>>>>>> What I noticed: if the dataset is too small the bug doesn't appear.
>>>>>>>>>
>>>>>>>>> You can modify the size of dataset, but in ideal case it should be
>>>>>>>>> few hundred thousands records per key (I guess it depends on the 
>>>>>>>>> machine you
>>>>>>>>> run it).
>>>>>>>>>
>>>>>>>>> Cheers, Pawel
>>>>>>>>>
>>>>>>>>> 2016-05-28 12:45 GMT+02:00 Aljoscha Krettek <[email protected]>:
>>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>> which version of Beam/Flink are you using.
>>>>>>>>>>
>>>>>>>>>> Could you maybe also provide example data and code that showcases
>>>>>>>>>> the problem? If you have concerns about sending it to a public list 
>>>>>>>>>> you can
>>>>>>>>>> also send it to me directly.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Aljoscha
>>>>>>>>>>
>>>>>>>>>> On Fri, 27 May 2016 at 20:53 Pawel Szczur <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Data description.
>>>>>>>>>>>
>>>>>>>>>>> I have two datasets.
>>>>>>>>>>>
>>>>>>>>>>> Records - the first, containes around 0.5-1M of records per
>>>>>>>>>>> (key,day). For testing I use 2-3 keys and 5-10 days of data. What I 
>>>>>>>>>>> shoot
>>>>>>>>>>> for is 1000+ keys. Each record contains key, timestamp in μ-seconds 
>>>>>>>>>>> and some
>>>>>>>>>>> other data.
>>>>>>>>>>> Configs - the second, is rather small. It describes the key in
>>>>>>>>>>> time, e.g. you can think about it as a list of tuples: (key, start 
>>>>>>>>>>> date, end
>>>>>>>>>>> date, description).
>>>>>>>>>>>
>>>>>>>>>>> For the exploration I've encoded the data as files of
>>>>>>>>>>> length-prefixed Protocol Buffer binary encoded messages. 
>>>>>>>>>>> Additionally the
>>>>>>>>>>> files are packed with gzip. Data is sharded by date. Each file is 
>>>>>>>>>>> around
>>>>>>>>>>> 10MB.
>>>>>>>>>>>
>>>>>>>>>>> Pipeline
>>>>>>>>>>>
>>>>>>>>>>> First I add keys to both datasets. For Records dataset it's (key,
>>>>>>>>>>> day rounded timestamp). For Configs a key is (key, day), where day 
>>>>>>>>>>> is each
>>>>>>>>>>> timestamp value between start date and end date (pointing midnight).
>>>>>>>>>>> The datasets are merged using CoGroupByKey.
>>>>>>>>>>>
>>>>>>>>>>> As a key type I use import org.apache.flink.api.java.tuple.Tuple2
>>>>>>>>>>> with a Tuple2Coder from this repo.
>>>>>>>>>>>
>>>>>>>>>>> The problem
>>>>>>>>>>>
>>>>>>>>>>> If the Records dataset is tiny like 5 days, everything seems fine
>>>>>>>>>>> (check normal_run.log).
>>>>>>>>>>>
>>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator
>>>>>>>>>>> values:
>>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - item count :
>>>>>>>>>>> 4322332
>>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 0
>>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>>>>>>>>>>>
>>>>>>>>>>> When I run the pipeline against 10+ days I encounter an error
>>>>>>>>>>> pointing that for some Records there's no Config (wrong_run.log).
>>>>>>>>>>>
>>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator
>>>>>>>>>>> values:
>>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - item count :
>>>>>>>>>>> 8577197
>>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 6
>>>>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>>>>>>>>>>>
>>>>>>>>>>> Then I've added some extra logging messages:
>>>>>>>>>>>
>>>>>>>>>>> (ConvertToItem.java:144) - 68643 items for KeyValue3 on:
>>>>>>>>>>> 1462665600000000
>>>>>>>>>>> (ConvertToItem.java:140) - no items for KeyValue3 on:
>>>>>>>>>>> 1463184000000000
>>>>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on:
>>>>>>>>>>> 1462924800000000
>>>>>>>>>>> (ConvertToItem.java:142) - 753707 items for KeyValue3 on:
>>>>>>>>>>> 1462924800000000 marked as no-loc
>>>>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on:
>>>>>>>>>>> 1462752000000000
>>>>>>>>>>> (ConvertToItem.java:142) - 749901 items for KeyValue3 on:
>>>>>>>>>>> 1462752000000000 marked as no-loc
>>>>>>>>>>> (ConvertToItem.java:144) - 754578 items for KeyValue3 on:
>>>>>>>>>>> 1462406400000000
>>>>>>>>>>> (ConvertToItem.java:144) - 751574 items for KeyValue3 on:
>>>>>>>>>>> 1463011200000000
>>>>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on:
>>>>>>>>>>> 1462665600000000
>>>>>>>>>>> (ConvertToItem.java:142) - 754758 items for KeyValue3 on:
>>>>>>>>>>> 1462665600000000 marked as no-loc
>>>>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on:
>>>>>>>>>>> 1463184000000000
>>>>>>>>>>> (ConvertToItem.java:142) - 694372 items for KeyValue3 on:
>>>>>>>>>>> 1463184000000000 marked as no-loc
>>>>>>>>>>>
>>>>>>>>>>> You can spot that in first line 68643 items were processed for
>>>>>>>>>>> KeyValue3 and time 1462665600000000.
>>>>>>>>>>> Later on in line 9 it seems the operation processes the same key
>>>>>>>>>>> again, but it reports that no Config was available for these 
>>>>>>>>>>> Records.
>>>>>>>>>>> The line 10 informs they've been marked as no-loc.
>>>>>>>>>>>
>>>>>>>>>>> The line 2 is saying that there were no items for KeyValue3 and
>>>>>>>>>>> time 1463184000000000, but in line 11 you can read that the items 
>>>>>>>>>>> for this
>>>>>>>>>>> (key,day) pair were processed later and they've lacked a Config.
>>>>>>>>>>>
>>>>>>>>>>> Work-around (after more testing, doesn't work, staying with
>>>>>>>>>>> Tuple2)
>>>>>>>>>>>
>>>>>>>>>>> I've switched from using Tuple2 to a Protocol Buffer message:
>>>>>>>>>>>
>>>>>>>>>>> message KeyDay {
>>>>>>>>>>>   optional ByteString key = 1;
>>>>>>>>>>>   optional int64 timestamp_usec = 2;
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> But using Tuple2.of() was just easier than:
>>>>>>>>>>> KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build().
>>>>>>>>>>>
>>>>>>>>>>> // The original description comes from:
>>>>>>>>>>> http://stackoverflow.com/questions/37473682/items-not-groupped-correctly-cogroupbykey
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>
>>>
>>
>

Reply via email to