Hi,
which version of Beam/Flink are you using.

Could you maybe also provide example data and code that showcases the
problem? If you have concerns about sending it to a public list you can
also send it to me directly.

Cheers,
Aljoscha

On Fri, 27 May 2016 at 20:53 Pawel Szczur <[email protected]> wrote:

> *Data description.*
>
> I have two datasets.
>
> Records - the first, containes around 0.5-1M of records per (key,day). For
> testing I use 2-3 keys and 5-10 days of data. What I shoot for is 1000+
> keys. Each record contains key, timestamp in μ-seconds and some other data.
> Configs - the second, is rather small. It describes the key in time, e.g.
> you can think about it as a list of tuples: (key, start date, end date,
> description).
>
> For the exploration I've encoded the data as files of length-prefixed
> Protocol Buffer binary encoded messages. Additionally the files are packed
> with gzip. Data is sharded by date. Each file is around 10MB.
>
> *Pipeline*
>
> First I add keys to both datasets. For Records dataset it's (key, day
> rounded timestamp). For Configs a key is (key, day), where day is each
> timestamp value between start date and end date (pointing midnight).
> The datasets are merged using CoGroupByKey.
>
> As a key type I use import org.apache.flink.api.java.tuple.Tuple2 with a
> Tuple2Coder from this repo.
>
> *The problem*
>
> If the Records dataset is tiny like 5 days, everything seems fine (check
> normal_run.log).
>
>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values:
>  INFO [main] (FlinkPipelineRunner.java:127) - item count : 4322332
>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 0
>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>
> When I run the pipeline against 10+ days I encounter an error pointing
> that for some Records there's no Config (wrong_run.log).
>
>  INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values:
>  INFO [main] (FlinkPipelineRunner.java:127) - item count : 8577197
>  INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 6
>  INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
>
> Then I've added some extra logging messages:
>
> (ConvertToItem.java:144) - 68643 items for KeyValue3 on: 1462665600000000
> (ConvertToItem.java:140) - no items for KeyValue3 on: 1463184000000000
> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462924800000000
> (ConvertToItem.java:142) - 753707 items for KeyValue3 on: 1462924800000000
> marked as no-loc
> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462752000000000
> (ConvertToItem.java:142) - 749901 items for KeyValue3 on: 1462752000000000
> marked as no-loc
> (ConvertToItem.java:144) - 754578 items for KeyValue3 on: 1462406400000000
> (ConvertToItem.java:144) - 751574 items for KeyValue3 on: 1463011200000000
> (ConvertToItem.java:123) - missing for KeyValue3 on: 1462665600000000
> (ConvertToItem.java:142) - 754758 items for KeyValue3 on: 1462665600000000
> marked as no-loc
> (ConvertToItem.java:123) - missing for KeyValue3 on: 1463184000000000
> (ConvertToItem.java:142) - 694372 items for KeyValue3 on: 1463184000000000
> marked as no-loc
>
> You can spot that in first line 68643 items were processed for KeyValue3
> and time 1462665600000000.
> Later on in line 9 it seems the operation processes the same key again,
> but it reports that no Config was available for these Records.
> The line 10 informs they've been marked as no-loc.
>
> The line 2 is saying that there were no items for KeyValue3 and time
> 1463184000000000, but in line 11 you can read that the items for this
> (key,day) pair were processed later and they've lacked a Config.
>
> *Work-around (after more testing, doesn't work, staying with Tuple2)*
>
> I've switched from using Tuple2 to a Protocol Buffer message:
>
> message KeyDay {
>   optional ByteString key = 1;
>   optional int64 timestamp_usec = 2;
> }
>
> But using Tuple2.of() was just easier than:
> KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build().
>
> // The original description comes from:
> http://stackoverflow.com/questions/37473682/items-not-groupped-correctly-cogroupbykey
>

Reply via email to