You're right. I'm still looking into this, unfortunately I haven't made progress so far. I'll keep you posted.
On Sun, 29 May 2016 at 18:20 Pawel Szczur <[email protected]> wrote: > Hi, > > I used the config as in the repo. > Please grep the the log for "hereGoesLongStringID0,2", you will see that > this key is processed multiple times. > > This is how I understand CoGroupByKey: one has two (or more) > PCollection<KV<K,?>>. Both sets are grouped by key. For each unique key a > KV<K, CoGbkResult> is produced, a given CoGbkResult contains all values > from all input PCollections which have the given key. > > But from the log it seems that each key produced more than one CoGbkResult. > > The final counters didn't catch the bug because in your case, the value > from dataset1 was replicated for each key. > > Cheers, Pawel > > 2016-05-29 15:59 GMT+02:00 Aljoscha Krettek <[email protected]>: > >> Hi, >> I ran your data generator with these configs: >> p.apply(Create.of(new Config(3, 5, 600_000, 1))) >> .apply(ParDo.of(new Generator())).apply( >> AvroIO.Write.to >> ("/tmp/dataset1").withSchema(DumbData.class).withNumShards(6)); >> >> p.apply(Create.of(new Config(3, 5, 600_000, 2))). >> apply(ParDo.of(new Generator())).apply( >> AvroIO.Write.to >> ("/tmp/dataset2").withSchema(DumbData.class).withNumShards(6)); >> >> Then I ran the job with parallelism=6. I couldn't reproduce the problem, >> this is the log file from one of several runs: >> https://gist.github.com/aljoscha/ef1d804f57671cd472c75b92b4aee51b >> >> Could you please send me the exact config that you used. Btw, I ran it >> inside an IDE, do the problems also occur in the IDE for you or only when >> you execute on a cluster? >> >> Cheers, >> Aljoscha >> >> On Sun, 29 May 2016 at 01:51 Pawel Szczur <[email protected]> wrote: >> >>> Hi Aljoscha. >>> >>> I've created a repo with fake dataset to allow easily reproduce the >>> problem: >>> https://github.com/orian/cogroup-wrong-grouping >>> >>> What I noticed: if the dataset is too small the bug doesn't appear. >>> >>> You can modify the size of dataset, but in ideal case it should be few >>> hundred thousands records per key (I guess it depends on the machine you >>> run it). >>> >>> Cheers, Pawel >>> >>> 2016-05-28 12:45 GMT+02:00 Aljoscha Krettek <[email protected]>: >>> >>>> Hi, >>>> which version of Beam/Flink are you using. >>>> >>>> Could you maybe also provide example data and code that showcases the >>>> problem? If you have concerns about sending it to a public list you can >>>> also send it to me directly. >>>> >>>> Cheers, >>>> Aljoscha >>>> >>>> On Fri, 27 May 2016 at 20:53 Pawel Szczur <[email protected]> >>>> wrote: >>>> >>>>> *Data description.* >>>>> >>>>> I have two datasets. >>>>> >>>>> Records - the first, containes around 0.5-1M of records per (key,day). >>>>> For testing I use 2-3 keys and 5-10 days of data. What I shoot for is >>>>> 1000+ >>>>> keys. Each record contains key, timestamp in μ-seconds and some other >>>>> data. >>>>> Configs - the second, is rather small. It describes the key in time, >>>>> e.g. you can think about it as a list of tuples: (key, start date, end >>>>> date, description). >>>>> >>>>> For the exploration I've encoded the data as files of length-prefixed >>>>> Protocol Buffer binary encoded messages. Additionally the files are packed >>>>> with gzip. Data is sharded by date. Each file is around 10MB. >>>>> >>>>> *Pipeline* >>>>> >>>>> First I add keys to both datasets. For Records dataset it's (key, day >>>>> rounded timestamp). For Configs a key is (key, day), where day is each >>>>> timestamp value between start date and end date (pointing midnight). >>>>> The datasets are merged using CoGroupByKey. >>>>> >>>>> As a key type I use import org.apache.flink.api.java.tuple.Tuple2 with >>>>> a Tuple2Coder from this repo. >>>>> >>>>> *The problem* >>>>> >>>>> If the Records dataset is tiny like 5 days, everything seems fine >>>>> (check normal_run.log). >>>>> >>>>> INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values: >>>>> INFO [main] (FlinkPipelineRunner.java:127) - item count : 4322332 >>>>> INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 0 >>>>> INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0 >>>>> >>>>> When I run the pipeline against 10+ days I encounter an error pointing >>>>> that for some Records there's no Config (wrong_run.log). >>>>> >>>>> INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values: >>>>> INFO [main] (FlinkPipelineRunner.java:127) - item count : 8577197 >>>>> INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 6 >>>>> INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0 >>>>> >>>>> Then I've added some extra logging messages: >>>>> >>>>> (ConvertToItem.java:144) - 68643 items for KeyValue3 on: >>>>> 1462665600000000 >>>>> (ConvertToItem.java:140) - no items for KeyValue3 on: 1463184000000000 >>>>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462924800000000 >>>>> (ConvertToItem.java:142) - 753707 items for KeyValue3 on: >>>>> 1462924800000000 marked as no-loc >>>>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462752000000000 >>>>> (ConvertToItem.java:142) - 749901 items for KeyValue3 on: >>>>> 1462752000000000 marked as no-loc >>>>> (ConvertToItem.java:144) - 754578 items for KeyValue3 on: >>>>> 1462406400000000 >>>>> (ConvertToItem.java:144) - 751574 items for KeyValue3 on: >>>>> 1463011200000000 >>>>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462665600000000 >>>>> (ConvertToItem.java:142) - 754758 items for KeyValue3 on: >>>>> 1462665600000000 marked as no-loc >>>>> (ConvertToItem.java:123) - missing for KeyValue3 on: 1463184000000000 >>>>> (ConvertToItem.java:142) - 694372 items for KeyValue3 on: >>>>> 1463184000000000 marked as no-loc >>>>> >>>>> You can spot that in first line 68643 items were processed for >>>>> KeyValue3 and time 1462665600000000. >>>>> Later on in line 9 it seems the operation processes the same key >>>>> again, but it reports that no Config was available for these Records. >>>>> The line 10 informs they've been marked as no-loc. >>>>> >>>>> The line 2 is saying that there were no items for KeyValue3 and time >>>>> 1463184000000000, but in line 11 you can read that the items for this >>>>> (key,day) pair were processed later and they've lacked a Config. >>>>> >>>>> *Work-around (after more testing, doesn't work, staying with Tuple2)* >>>>> >>>>> I've switched from using Tuple2 to a Protocol Buffer message: >>>>> >>>>> message KeyDay { >>>>> optional ByteString key = 1; >>>>> optional int64 timestamp_usec = 2; >>>>> } >>>>> >>>>> But using Tuple2.of() was just easier than: >>>>> KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build(). >>>>> >>>>> // The original description comes from: >>>>> http://stackoverflow.com/questions/37473682/items-not-groupped-correctly-cogroupbykey >>>>> >>>> >>> >
