Hi, which version of Beam/Flink are you using. Could you maybe also provide example data and code that showcases the problem? If you have concerns about sending it to a public list you can also send it to me directly.
Cheers, Aljoscha On Fri, 27 May 2016 at 20:53 Pawel Szczur <[email protected]> wrote: > *Data description.* > > I have two datasets. > > Records - the first, containes around 0.5-1M of records per (key,day). For > testing I use 2-3 keys and 5-10 days of data. What I shoot for is 1000+ > keys. Each record contains key, timestamp in μ-seconds and some other data. > Configs - the second, is rather small. It describes the key in time, e.g. > you can think about it as a list of tuples: (key, start date, end date, > description). > > For the exploration I've encoded the data as files of length-prefixed > Protocol Buffer binary encoded messages. Additionally the files are packed > with gzip. Data is sharded by date. Each file is around 10MB. > > *Pipeline* > > First I add keys to both datasets. For Records dataset it's (key, day > rounded timestamp). For Configs a key is (key, day), where day is each > timestamp value between start date and end date (pointing midnight). > The datasets are merged using CoGroupByKey. > > As a key type I use import org.apache.flink.api.java.tuple.Tuple2 with a > Tuple2Coder from this repo. > > *The problem* > > If the Records dataset is tiny like 5 days, everything seems fine (check > normal_run.log). > > INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values: > INFO [main] (FlinkPipelineRunner.java:127) - item count : 4322332 > INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 0 > INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0 > > When I run the pipeline against 10+ days I encounter an error pointing > that for some Records there's no Config (wrong_run.log). > > INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values: > INFO [main] (FlinkPipelineRunner.java:127) - item count : 8577197 > INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 6 > INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0 > > Then I've added some extra logging messages: > > (ConvertToItem.java:144) - 68643 items for KeyValue3 on: 1462665600000000 > (ConvertToItem.java:140) - no items for KeyValue3 on: 1463184000000000 > (ConvertToItem.java:123) - missing for KeyValue3 on: 1462924800000000 > (ConvertToItem.java:142) - 753707 items for KeyValue3 on: 1462924800000000 > marked as no-loc > (ConvertToItem.java:123) - missing for KeyValue3 on: 1462752000000000 > (ConvertToItem.java:142) - 749901 items for KeyValue3 on: 1462752000000000 > marked as no-loc > (ConvertToItem.java:144) - 754578 items for KeyValue3 on: 1462406400000000 > (ConvertToItem.java:144) - 751574 items for KeyValue3 on: 1463011200000000 > (ConvertToItem.java:123) - missing for KeyValue3 on: 1462665600000000 > (ConvertToItem.java:142) - 754758 items for KeyValue3 on: 1462665600000000 > marked as no-loc > (ConvertToItem.java:123) - missing for KeyValue3 on: 1463184000000000 > (ConvertToItem.java:142) - 694372 items for KeyValue3 on: 1463184000000000 > marked as no-loc > > You can spot that in first line 68643 items were processed for KeyValue3 > and time 1462665600000000. > Later on in line 9 it seems the operation processes the same key again, > but it reports that no Config was available for these Records. > The line 10 informs they've been marked as no-loc. > > The line 2 is saying that there were no items for KeyValue3 and time > 1463184000000000, but in line 11 you can read that the items for this > (key,day) pair were processed later and they've lacked a Config. > > *Work-around (after more testing, doesn't work, staying with Tuple2)* > > I've switched from using Tuple2 to a Protocol Buffer message: > > message KeyDay { > optional ByteString key = 1; > optional int64 timestamp_usec = 2; > } > > But using Tuple2.of() was just easier than: > KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build(). > > // The original description comes from: > http://stackoverflow.com/questions/37473682/items-not-groupped-correctly-cogroupbykey >
