Hi Aljoscha.

I've created a repo with fake dataset to allow easily reproduce the problem:
https://github.com/orian/cogroup-wrong-grouping

What I noticed: if the dataset is too small the bug doesn't appear.

You can modify the size of dataset, but in ideal case it should be few
hundred thousands records per key (I guess it depends on the machine you
run it).

Cheers, Pawel

2016-05-28 12:45 GMT+02:00 Aljoscha Krettek <[email protected]>:

> Hi,
> which version of Beam/Flink are you using.
>
> Could you maybe also provide example data and code that showcases the
> problem? If you have concerns about sending it to a public list you can
> also send it to me directly.
>
> Cheers,
> Aljoscha
>
> On Fri, 27 May 2016 at 20:53 Pawel Szczur <[email protected]> wrote:
>
>> *Data description.*
>>
>> I have two datasets.
>>
>> Records - the first, containes around 0.5-1M of records per (key,day).
>> For testing I use 2-3 keys and 5-10 days of data. What I shoot for is 1000+
>> keys. Each record contains key, timestamp in μ-seconds and some other data.
>> Configs - the second, is rather small. It describes the key in time, e.g.
>> you can think about it as a list of tuples: (key, start date, end date,
>> description).
>>
>> For the exploration I've encoded the data as files of length-prefixed
>> Protocol Buffer binary encoded messages. Additionally the files are packed
>> with gzip. Data is sharded by date. Each file is around 10MB.
>>
>> *Pipeline*
>>
>> First I add keys to both datasets. For Records dataset it's (key, day
>> rounded timestamp). For Configs a key is (key, day), where day is each
>> timestamp value between start date and end date (pointing midnight).
>> The datasets are merged using CoGroupByKey.
>>
>> As a key type I use import org.apache.flink.api.java.tuple.Tuple2 with a
>> Tuple2Coder from this repo.
>>
>> *The problem*
>>
>> If the Records dataset is tiny like 5 days, everything seems fine (check
>> normal_run.log).
>>
>>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values:
>>  INFO [main] (FlinkPipelineRunner.java:127) - item count : 4322332
>>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 0
>>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>>
>> When I run the pipeline against 10+ days I encounter an error pointing
>> that for some Records there's no Config (wrong_run.log).
>>
>>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values:
>>  INFO [main] (FlinkPipelineRunner.java:127) - item count : 8577197
>>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 6
>>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>>
>> Then I've added some extra logging messages:
>>
>> (ConvertToItem.java:144) - 68643 items for KeyValue3 on: 1462665600000000
>> (ConvertToItem.java:140) - no items for KeyValue3 on: 1463184000000000
>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462924800000000
>> (ConvertToItem.java:142) - 753707 items for KeyValue3 on:
>> 1462924800000000 marked as no-loc
>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462752000000000
>> (ConvertToItem.java:142) - 749901 items for KeyValue3 on:
>> 1462752000000000 marked as no-loc
>> (ConvertToItem.java:144) - 754578 items for KeyValue3 on: 1462406400000000
>> (ConvertToItem.java:144) - 751574 items for KeyValue3 on: 1463011200000000
>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462665600000000
>> (ConvertToItem.java:142) - 754758 items for KeyValue3 on:
>> 1462665600000000 marked as no-loc
>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1463184000000000
>> (ConvertToItem.java:142) - 694372 items for KeyValue3 on:
>> 1463184000000000 marked as no-loc
>>
>> You can spot that in first line 68643 items were processed for KeyValue3
>> and time 1462665600000000.
>> Later on in line 9 it seems the operation processes the same key again,
>> but it reports that no Config was available for these Records.
>> The line 10 informs they've been marked as no-loc.
>>
>> The line 2 is saying that there were no items for KeyValue3 and time
>> 1463184000000000, but in line 11 you can read that the items for this
>> (key,day) pair were processed later and they've lacked a Config.
>>
>> *Work-around (after more testing, doesn't work, staying with Tuple2)*
>>
>> I've switched from using Tuple2 to a Protocol Buffer message:
>>
>> message KeyDay {
>>   optional ByteString key = 1;
>>   optional int64 timestamp_usec = 2;
>> }
>>
>> But using Tuple2.of() was just easier than:
>> KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build().
>>
>> // The original description comes from:
>> http://stackoverflow.com/questions/37473682/items-not-groupped-correctly-cogroupbykey
>>
>

Reply via email to