I've just tested it. It fails.

Also added the test to the repo:
https://github.com/orian/cogroup-wrong-grouping

I reason, this means that GroupByKey is flawed? If you open an official
issue, please add it to discussion.

2016-05-31 11:55 GMT+02:00 Aljoscha Krettek <[email protected]>:

> Does 2. work for the cases where CoGroupByKey fails? Reason I'm asking is
> that CoGroupByKey is essentially implemented like that internally: create
> tagged union -> flatten -> GroupByKey.
>
> On Tue, 31 May 2016 at 01:16 Pawel Szczur <[email protected]> wrote:
>
>> I've naively tried few other key types, it seems to be unrelated to key
>> type.
>>
>> As for now I have two workarounds and ignorance:
>>  1. If there is one dominant dataset and other datasets are small (size
>> << GB) then I use SideInput.
>>  2. If I have multiple datasets of similar size I enclose it in a common
>> container, flatten it and GroupByKey.
>>  3. I measure occurrences and ignore the bug for now.
>>
>> Do you have an idea how a test for this may be constructed? It seems
>> handy, I think.
>>
>> I also found two things, maybe they help you:
>>  1. issue doesn't appear without parallelism
>>  2. issue doesn't appear with a tiny datasets
>>
>> 2016-05-30 17:13 GMT+02:00 Aljoscha Krettek <[email protected]>:
>>
>>> You're right. I'm still looking into this, unfortunately I haven't made
>>> progress so far. I'll keep you posted.
>>>
>>> On Sun, 29 May 2016 at 18:20 Pawel Szczur <[email protected]> wrote:
>>>
>>>> Hi,
>>>>
>>>> I used the config as in the repo.
>>>> Please grep the the log for "hereGoesLongStringID0,2", you will see
>>>> that this key is processed multiple times.
>>>>
>>>> This is how I understand CoGroupByKey: one has two (or more)
>>>> PCollection<KV<K,?>>. Both sets are grouped by key. For each unique key a
>>>> KV<K, CoGbkResult> is produced, a given CoGbkResult contains all values
>>>> from all input PCollections which have the given key.
>>>>
>>>> But from the log it seems that each key produced more than one
>>>> CoGbkResult.
>>>>
>>>> The final counters didn't catch the bug because in your case, the value
>>>> from dataset1 was replicated for each key.
>>>>
>>>> Cheers, Pawel
>>>>
>>>> 2016-05-29 15:59 GMT+02:00 Aljoscha Krettek <[email protected]>:
>>>>
>>>>> Hi,
>>>>> I ran your data generator with these configs:
>>>>> p.apply(Create.of(new Config(3, 5, 600_000, 1)))
>>>>>     .apply(ParDo.of(new Generator())).apply(
>>>>>         AvroIO.Write.to
>>>>> ("/tmp/dataset1").withSchema(DumbData.class).withNumShards(6));
>>>>>
>>>>> p.apply(Create.of(new Config(3, 5, 600_000, 2))).
>>>>>     apply(ParDo.of(new Generator())).apply(
>>>>>         AvroIO.Write.to
>>>>> ("/tmp/dataset2").withSchema(DumbData.class).withNumShards(6));
>>>>>
>>>>> Then I ran the job with parallelism=6. I couldn't reproduce the
>>>>> problem, this is the log file from one of several runs:
>>>>> https://gist.github.com/aljoscha/ef1d804f57671cd472c75b92b4aee51b
>>>>>
>>>>> Could you please send me the exact config that you used. Btw, I ran it
>>>>> inside an IDE, do the problems also occur in the IDE for you or only when
>>>>> you execute on a cluster?
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>
>>>>> On Sun, 29 May 2016 at 01:51 Pawel Szczur <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Aljoscha.
>>>>>>
>>>>>> I've created a repo with fake dataset to allow easily reproduce the
>>>>>> problem:
>>>>>> https://github.com/orian/cogroup-wrong-grouping
>>>>>>
>>>>>> What I noticed: if the dataset is too small the bug doesn't appear.
>>>>>>
>>>>>> You can modify the size of dataset, but in ideal case it should be
>>>>>> few hundred thousands records per key (I guess it depends on the machine
>>>>>> you run it).
>>>>>>
>>>>>> Cheers, Pawel
>>>>>>
>>>>>> 2016-05-28 12:45 GMT+02:00 Aljoscha Krettek <[email protected]>:
>>>>>>
>>>>>>> Hi,
>>>>>>> which version of Beam/Flink are you using.
>>>>>>>
>>>>>>> Could you maybe also provide example data and code that showcases
>>>>>>> the problem? If you have concerns about sending it to a public list you 
>>>>>>> can
>>>>>>> also send it to me directly.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Aljoscha
>>>>>>>
>>>>>>> On Fri, 27 May 2016 at 20:53 Pawel Szczur <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> *Data description.*
>>>>>>>>
>>>>>>>> I have two datasets.
>>>>>>>>
>>>>>>>> Records - the first, containes around 0.5-1M of records per
>>>>>>>> (key,day). For testing I use 2-3 keys and 5-10 days of data. What I 
>>>>>>>> shoot
>>>>>>>> for is 1000+ keys. Each record contains key, timestamp in μ-seconds and
>>>>>>>> some other data.
>>>>>>>> Configs - the second, is rather small. It describes the key in
>>>>>>>> time, e.g. you can think about it as a list of tuples: (key, start 
>>>>>>>> date,
>>>>>>>> end date, description).
>>>>>>>>
>>>>>>>> For the exploration I've encoded the data as files of
>>>>>>>> length-prefixed Protocol Buffer binary encoded messages. Additionally 
>>>>>>>> the
>>>>>>>> files are packed with gzip. Data is sharded by date. Each file is 
>>>>>>>> around
>>>>>>>> 10MB.
>>>>>>>>
>>>>>>>> *Pipeline*
>>>>>>>>
>>>>>>>> First I add keys to both datasets. For Records dataset it's (key,
>>>>>>>> day rounded timestamp). For Configs a key is (key, day), where day is 
>>>>>>>> each
>>>>>>>> timestamp value between start date and end date (pointing midnight).
>>>>>>>> The datasets are merged using CoGroupByKey.
>>>>>>>>
>>>>>>>> As a key type I use import org.apache.flink.api.java.tuple.Tuple2
>>>>>>>> with a Tuple2Coder from this repo.
>>>>>>>>
>>>>>>>> *The problem*
>>>>>>>>
>>>>>>>> If the Records dataset is tiny like 5 days, everything seems fine
>>>>>>>> (check normal_run.log).
>>>>>>>>
>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator
>>>>>>>> values:
>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - item count : 4322332
>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 0
>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>>>>>>>>
>>>>>>>> When I run the pipeline against 10+ days I encounter an error
>>>>>>>> pointing that for some Records there's no Config (wrong_run.log).
>>>>>>>>
>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator
>>>>>>>> values:
>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - item count : 8577197
>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 6
>>>>>>>>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>>>>>>>>
>>>>>>>> Then I've added some extra logging messages:
>>>>>>>>
>>>>>>>> (ConvertToItem.java:144) - 68643 items for KeyValue3 on:
>>>>>>>> 1462665600000000
>>>>>>>> (ConvertToItem.java:140) - no items for KeyValue3 on:
>>>>>>>> 1463184000000000
>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on:
>>>>>>>> 1462924800000000
>>>>>>>> (ConvertToItem.java:142) - 753707 items for KeyValue3 on:
>>>>>>>> 1462924800000000 marked as no-loc
>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on:
>>>>>>>> 1462752000000000
>>>>>>>> (ConvertToItem.java:142) - 749901 items for KeyValue3 on:
>>>>>>>> 1462752000000000 marked as no-loc
>>>>>>>> (ConvertToItem.java:144) - 754578 items for KeyValue3 on:
>>>>>>>> 1462406400000000
>>>>>>>> (ConvertToItem.java:144) - 751574 items for KeyValue3 on:
>>>>>>>> 1463011200000000
>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on:
>>>>>>>> 1462665600000000
>>>>>>>> (ConvertToItem.java:142) - 754758 items for KeyValue3 on:
>>>>>>>> 1462665600000000 marked as no-loc
>>>>>>>> (ConvertToItem.java:123) - missing for KeyValue3 on:
>>>>>>>> 1463184000000000
>>>>>>>> (ConvertToItem.java:142) - 694372 items for KeyValue3 on:
>>>>>>>> 1463184000000000 marked as no-loc
>>>>>>>>
>>>>>>>> You can spot that in first line 68643 items were processed for
>>>>>>>> KeyValue3 and time 1462665600000000.
>>>>>>>> Later on in line 9 it seems the operation processes the same key
>>>>>>>> again, but it reports that no Config was available for these Records.
>>>>>>>> The line 10 informs they've been marked as no-loc.
>>>>>>>>
>>>>>>>> The line 2 is saying that there were no items for KeyValue3 and
>>>>>>>> time 1463184000000000, but in line 11 you can read that the items for 
>>>>>>>> this
>>>>>>>> (key,day) pair were processed later and they've lacked a Config.
>>>>>>>>
>>>>>>>> *Work-around (after more testing, doesn't work, staying with
>>>>>>>> Tuple2)*
>>>>>>>>
>>>>>>>> I've switched from using Tuple2 to a Protocol Buffer message:
>>>>>>>>
>>>>>>>> message KeyDay {
>>>>>>>>   optional ByteString key = 1;
>>>>>>>>   optional int64 timestamp_usec = 2;
>>>>>>>> }
>>>>>>>>
>>>>>>>> But using Tuple2.of() was just easier than:
>>>>>>>> KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build().
>>>>>>>>
>>>>>>>> // The original description comes from:
>>>>>>>> http://stackoverflow.com/questions/37473682/items-not-groupped-correctly-cogroupbykey
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>

Reply via email to