It sounds like a bug in the Flink runner. I'm pretty sure Max and Aljoscha will fix that soon ;)

Regards
JB

On 05/31/2016 03:47 PM, Pawel Szczur wrote:
FYI I've opened an issue: https://issues.apache.org/jira/browse/BEAM-315

2016-05-31 14:51 GMT+02:00 Pawel Szczur <[email protected]
<mailto:[email protected]>>:

    I've also added the test for GroupByKey. It fails. It kind of makes
    Flink broken at the moment, isn't it?

    I'm wondering.. may it be related to some Windowing issue?

    2016-05-31 14:40 GMT+02:00 Pawel Szczur <[email protected]
    <mailto:[email protected]>>:

        I've just tested it. It fails.

        Also added the test to the repo:
        https://github.com/orian/cogroup-wrong-grouping

        I reason, this means that GroupByKey is flawed? If you open an
        official issue, please add it to discussion.

        2016-05-31 11:55 GMT+02:00 Aljoscha Krettek <[email protected]
        <mailto:[email protected]>>:

            Does 2. work for the cases where CoGroupByKey fails? Reason
            I'm asking is that CoGroupByKey is essentially implemented
            like that internally: create tagged union -> flatten ->
            GroupByKey.

            On Tue, 31 May 2016 at 01:16 Pawel Szczur
            <[email protected] <mailto:[email protected]>> wrote:

                I've naively tried few other key types, it seems to be
                unrelated to key type.

                As for now I have two workarounds and ignorance:
                  1. If there is one dominant dataset and other datasets
                are small (size << GB) then I use SideInput.
                  2. If I have multiple datasets of similar size I
                enclose it in a common container, flatten it and GroupByKey.
                  3. I measure occurrences and ignore the bug for now.

                Do you have an idea how a test for this may be
                constructed? It seems handy, I think.

                I also found two things, maybe they help you:
                  1. issue doesn't appear without parallelism
                  2. issue doesn't appear with a tiny datasets

                2016-05-30 17:13 GMT+02:00 Aljoscha Krettek
                <[email protected] <mailto:[email protected]>>:

                    You're right. I'm still looking into this,
                    unfortunately I haven't made progress so far. I'll
                    keep you posted.

                    On Sun, 29 May 2016 at 18:20 Pawel Szczur
                    <[email protected]
                    <mailto:[email protected]>> wrote:

                        Hi,

                        I used the config as in the repo.
                        Please grep the the log for
                        "hereGoesLongStringID0,2", you will see that
                        this key is processed multiple times.

                        This is how I understand CoGroupByKey: one has
                        two (or more) PCollection<KV<K,?>>. Both sets
                        are grouped by key. For each unique key a KV<K,
                        CoGbkResult> is produced, a given CoGbkResult
                        contains all values from all input PCollections
                        which have the given key.

                        But from the log it seems that each key produced
                        more than one CoGbkResult.

                        The final counters didn't catch the bug because
                        in your case, the value from dataset1 was
                        replicated for each key.

                        Cheers, Pawel

                        2016-05-29 15:59 GMT+02:00 Aljoscha Krettek
                        <[email protected] <mailto:[email protected]>>:

                            Hi,
                            I ran your data generator with these configs:
                            p.apply(Create.of(new Config(3, 5, 600_000, 1)))
                                 .apply(ParDo.of(new Generator())).apply(
                            AvroIO.Write.to
                            
<http://AvroIO.Write.to>("/tmp/dataset1").withSchema(DumbData.class).withNumShards(6));

                            p.apply(Create.of(new Config(3, 5, 600_000,
                            2))).
                                 apply(ParDo.of(new Generator())).apply(
                            AvroIO.Write.to
                            
<http://AvroIO.Write.to>("/tmp/dataset2").withSchema(DumbData.class).withNumShards(6));

                            Then I ran the job with parallelism=6. I
                            couldn't reproduce the problem, this is the
                            log file from one of several runs:
                            
https://gist.github.com/aljoscha/ef1d804f57671cd472c75b92b4aee51b

                            Could you please send me the exact config
                            that you used. Btw, I ran it inside an IDE,
                            do the problems also occur in the IDE for
                            you or only when you execute on a cluster?

                            Cheers,
                            Aljoscha

                            On Sun, 29 May 2016 at 01:51 Pawel Szczur
                            <[email protected]
                            <mailto:[email protected]>> wrote:

                                Hi Aljoscha.

                                I've created a repo with fake dataset to
                                allow easily reproduce the problem:
                                https://github.com/orian/cogroup-wrong-grouping

                                What I noticed: if the dataset is too
                                small the bug doesn't appear.

                                You can modify the size of dataset, but
                                in ideal case it should be few hundred
                                thousands records per key (I guess it
                                depends on the machine you run it).

                                Cheers, Pawel

                                2016-05-28 12:45 GMT+02:00 Aljoscha
                                Krettek <[email protected]
                                <mailto:[email protected]>>:

                                    Hi,
                                    which version of Beam/Flink are you
                                    using.

                                    Could you maybe also provide example
                                    data and code that showcases the
                                    problem? If you have concerns about
                                    sending it to a public list you can
                                    also send it to me directly.

                                    Cheers,
                                    Aljoscha

                                    On Fri, 27 May 2016 at 20:53 Pawel
                                    Szczur <[email protected]
                                    <mailto:[email protected]>> wrote:

                                        *Data description.*

                                        I have two datasets.

                                        Records - the first, containes
                                        around 0.5-1M of records per
                                        (key,day). For testing I use 2-3
                                        keys and 5-10 days of data. What
                                        I shoot for is 1000+ keys. Each
                                        record contains key, timestamp
                                        in μ-seconds and some other data.
                                        Configs - the second, is rather
                                        small. It describes the key in
                                        time, e.g. you can think about
                                        it as a list of tuples: (key,
                                        start date, end date, description).

                                        For the exploration I've encoded
                                        the data as files of
                                        length-prefixed Protocol Buffer
                                        binary encoded messages.
                                        Additionally the files are
                                        packed with gzip. Data is
                                        sharded by date. Each file is
                                        around 10MB.

                                        *Pipeline*

                                        First I add keys to both
                                        datasets. For Records dataset
                                        it's (key, day rounded
                                        timestamp). For Configs a key is
                                        (key, day), where day is each
                                        timestamp value between start
                                        date and end date (pointing
                                        midnight).
                                        The datasets are merged using
                                        CoGroupByKey.

                                        As a key type I use import
                                        org.apache.flink.api.java.tuple.Tuple2
                                        with a Tuple2Coder from this repo.

                                        *The problem*

                                        If the Records dataset is tiny
                                        like 5 days, everything seems
                                        fine (check normal_run.log).

                                          INFO [main]
                                        (FlinkPipelineRunner.java:124) -
                                        Final aggregator values:
                                          INFO [main]
                                        (FlinkPipelineRunner.java:127) -
                                        item count : 4322332
                                          INFO [main]
                                        (FlinkPipelineRunner.java:127) -
                                        missing val1 : 0
                                          INFO [main]
                                        (FlinkPipelineRunner.java:127) -
                                        multiple val1 : 0

                                        When I run the pipeline against
                                        10+ days I encounter an error
                                        pointing that for some Records
                                        there's no Config (wrong_run.log).

                                          INFO [main]
                                        (FlinkPipelineRunner.java:124) -
                                        Final aggregator values:
                                          INFO [main]
                                        (FlinkPipelineRunner.java:127) -
                                        item count : 8577197
                                          INFO [main]
                                        (FlinkPipelineRunner.java:127) -
                                        missing val1 : 6
                                          INFO [main]
                                        (FlinkPipelineRunner.java:127) -
                                        multiple val1 : 0

                                        Then I've added some extra
                                        logging messages:

                                        (ConvertToItem.java:144) - 68643
                                        items for KeyValue3 on:
                                        1462665600000000
                                        (ConvertToItem.java:140) - no
                                        items for KeyValue3 on:
                                        1463184000000000
                                        (ConvertToItem.java:123) -
                                        missing for KeyValue3 on:
                                        1462924800000000
                                        (ConvertToItem.java:142) -
                                        753707 items for KeyValue3 on:
                                        1462924800000000 marked as no-loc
                                        (ConvertToItem.java:123) -
                                        missing for KeyValue3 on:
                                        1462752000000000
                                        (ConvertToItem.java:142) -
                                        749901 items for KeyValue3 on:
                                        1462752000000000 marked as no-loc
                                        (ConvertToItem.java:144) -
                                        754578 items for KeyValue3 on:
                                        1462406400000000
                                        (ConvertToItem.java:144) -
                                        751574 items for KeyValue3 on:
                                        1463011200000000
                                        (ConvertToItem.java:123) -
                                        missing for KeyValue3 on:
                                        1462665600000000
                                        (ConvertToItem.java:142) -
                                        754758 items for KeyValue3 on:
                                        1462665600000000 marked as no-loc
                                        (ConvertToItem.java:123) -
                                        missing for KeyValue3 on:
                                        1463184000000000
                                        (ConvertToItem.java:142) -
                                        694372 items for KeyValue3 on:
                                        1463184000000000 marked as no-loc

                                        You can spot that in first line
                                        68643 items were processed for
                                        KeyValue3 and time 1462665600000000.
                                        Later on in line 9 it seems the
                                        operation processes the same key
                                        again, but it reports that no
                                        Config was available for these
                                        Records.
                                        The line 10 informs they've been
                                        marked as no-loc.

                                        The line 2 is saying that there
                                        were no items for KeyValue3 and
                                        time 1463184000000000, but in
                                        line 11 you can read that the
                                        items for this (key,day) pair
                                        were processed later and they've
                                        lacked a Config.

                                        *Work-around (after more
                                        testing, doesn't work, staying
                                        with Tuple2)*

                                        I've switched from using Tuple2
                                        to a Protocol Buffer message:

                                        message KeyDay {
                                           optional ByteString key = 1;
                                           optional int64 timestamp_usec
                                        = 2;
                                        }

                                        But using Tuple2.of() was just
                                        easier than:
                                        
KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build().

                                        // The original description
                                        comes from:
                                        
http://stackoverflow.com/questions/37473682/items-not-groupped-correctly-cogroupbykey








--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to