Hi,
I ran your data generator with these configs:
p.apply(Create.of(new Config(3, 5, 600_000, 1)))
    .apply(ParDo.of(new Generator())).apply(
        AvroIO.Write.to
("/tmp/dataset1").withSchema(DumbData.class).withNumShards(6));

p.apply(Create.of(new Config(3, 5, 600_000, 2))).
    apply(ParDo.of(new Generator())).apply(
        AvroIO.Write.to
("/tmp/dataset2").withSchema(DumbData.class).withNumShards(6));

Then I ran the job with parallelism=6. I couldn't reproduce the problem,
this is the log file from one of several runs:
https://gist.github.com/aljoscha/ef1d804f57671cd472c75b92b4aee51b

Could you please send me the exact config that you used. Btw, I ran it
inside an IDE, do the problems also occur in the IDE for you or only when
you execute on a cluster?

Cheers,
Aljoscha

On Sun, 29 May 2016 at 01:51 Pawel Szczur <[email protected]> wrote:

> Hi Aljoscha.
>
> I've created a repo with fake dataset to allow easily reproduce the
> problem:
> https://github.com/orian/cogroup-wrong-grouping
>
> What I noticed: if the dataset is too small the bug doesn't appear.
>
> You can modify the size of dataset, but in ideal case it should be few
> hundred thousands records per key (I guess it depends on the machine you
> run it).
>
> Cheers, Pawel
>
> 2016-05-28 12:45 GMT+02:00 Aljoscha Krettek <[email protected]>:
>
>> Hi,
>> which version of Beam/Flink are you using.
>>
>> Could you maybe also provide example data and code that showcases the
>> problem? If you have concerns about sending it to a public list you can
>> also send it to me directly.
>>
>> Cheers,
>> Aljoscha
>>
>> On Fri, 27 May 2016 at 20:53 Pawel Szczur <[email protected]> wrote:
>>
>>> *Data description.*
>>>
>>> I have two datasets.
>>>
>>> Records - the first, containes around 0.5-1M of records per (key,day).
>>> For testing I use 2-3 keys and 5-10 days of data. What I shoot for is 1000+
>>> keys. Each record contains key, timestamp in μ-seconds and some other data.
>>> Configs - the second, is rather small. It describes the key in time,
>>> e.g. you can think about it as a list of tuples: (key, start date, end
>>> date, description).
>>>
>>> For the exploration I've encoded the data as files of length-prefixed
>>> Protocol Buffer binary encoded messages. Additionally the files are packed
>>> with gzip. Data is sharded by date. Each file is around 10MB.
>>>
>>> *Pipeline*
>>>
>>> First I add keys to both datasets. For Records dataset it's (key, day
>>> rounded timestamp). For Configs a key is (key, day), where day is each
>>> timestamp value between start date and end date (pointing midnight).
>>> The datasets are merged using CoGroupByKey.
>>>
>>> As a key type I use import org.apache.flink.api.java.tuple.Tuple2 with a
>>> Tuple2Coder from this repo.
>>>
>>> *The problem*
>>>
>>> If the Records dataset is tiny like 5 days, everything seems fine (check
>>> normal_run.log).
>>>
>>>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values:
>>>  INFO [main] (FlinkPipelineRunner.java:127) - item count : 4322332
>>>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 0
>>>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>>>
>>> When I run the pipeline against 10+ days I encounter an error pointing
>>> that for some Records there's no Config (wrong_run.log).
>>>
>>>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values:
>>>  INFO [main] (FlinkPipelineRunner.java:127) - item count : 8577197
>>>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 6
>>>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>>>
>>> Then I've added some extra logging messages:
>>>
>>> (ConvertToItem.java:144) - 68643 items for KeyValue3 on: 1462665600000000
>>> (ConvertToItem.java:140) - no items for KeyValue3 on: 1463184000000000
>>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462924800000000
>>> (ConvertToItem.java:142) - 753707 items for KeyValue3 on:
>>> 1462924800000000 marked as no-loc
>>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462752000000000
>>> (ConvertToItem.java:142) - 749901 items for KeyValue3 on:
>>> 1462752000000000 marked as no-loc
>>> (ConvertToItem.java:144) - 754578 items for KeyValue3 on:
>>> 1462406400000000
>>> (ConvertToItem.java:144) - 751574 items for KeyValue3 on:
>>> 1463011200000000
>>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462665600000000
>>> (ConvertToItem.java:142) - 754758 items for KeyValue3 on:
>>> 1462665600000000 marked as no-loc
>>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1463184000000000
>>> (ConvertToItem.java:142) - 694372 items for KeyValue3 on:
>>> 1463184000000000 marked as no-loc
>>>
>>> You can spot that in first line 68643 items were processed for KeyValue3
>>> and time 1462665600000000.
>>> Later on in line 9 it seems the operation processes the same key again,
>>> but it reports that no Config was available for these Records.
>>> The line 10 informs they've been marked as no-loc.
>>>
>>> The line 2 is saying that there were no items for KeyValue3 and time
>>> 1463184000000000, but in line 11 you can read that the items for this
>>> (key,day) pair were processed later and they've lacked a Config.
>>>
>>> *Work-around (after more testing, doesn't work, staying with Tuple2)*
>>>
>>> I've switched from using Tuple2 to a Protocol Buffer message:
>>>
>>> message KeyDay {
>>>   optional ByteString key = 1;
>>>   optional int64 timestamp_usec = 2;
>>> }
>>>
>>> But using Tuple2.of() was just easier than:
>>> KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build().
>>>
>>> // The original description comes from:
>>> http://stackoverflow.com/questions/37473682/items-not-groupped-correctly-cogroupbykey
>>>
>>
>

Reply via email to