It sounds like a bug in the Flink runner. I'm pretty sure Max and
Aljoscha will fix that soon ;)
Regards
JB
On 05/31/2016 03:47 PM, Pawel Szczur wrote:
FYI I've opened an issue: https://issues.apache.org/jira/browse/BEAM-315
2016-05-31 14:51 GMT+02:00 Pawel Szczur <[email protected]
<mailto:[email protected]>>:
I've also added the test for GroupByKey. It fails. It kind of makes
Flink broken at the moment, isn't it?
I'm wondering.. may it be related to some Windowing issue?
2016-05-31 14:40 GMT+02:00 Pawel Szczur <[email protected]
<mailto:[email protected]>>:
I've just tested it. It fails.
Also added the test to the repo:
https://github.com/orian/cogroup-wrong-grouping
I reason, this means that GroupByKey is flawed? If you open an
official issue, please add it to discussion.
2016-05-31 11:55 GMT+02:00 Aljoscha Krettek <[email protected]
<mailto:[email protected]>>:
Does 2. work for the cases where CoGroupByKey fails? Reason
I'm asking is that CoGroupByKey is essentially implemented
like that internally: create tagged union -> flatten ->
GroupByKey.
On Tue, 31 May 2016 at 01:16 Pawel Szczur
<[email protected] <mailto:[email protected]>> wrote:
I've naively tried few other key types, it seems to be
unrelated to key type.
As for now I have two workarounds and ignorance:
1. If there is one dominant dataset and other datasets
are small (size << GB) then I use SideInput.
2. If I have multiple datasets of similar size I
enclose it in a common container, flatten it and GroupByKey.
3. I measure occurrences and ignore the bug for now.
Do you have an idea how a test for this may be
constructed? It seems handy, I think.
I also found two things, maybe they help you:
1. issue doesn't appear without parallelism
2. issue doesn't appear with a tiny datasets
2016-05-30 17:13 GMT+02:00 Aljoscha Krettek
<[email protected] <mailto:[email protected]>>:
You're right. I'm still looking into this,
unfortunately I haven't made progress so far. I'll
keep you posted.
On Sun, 29 May 2016 at 18:20 Pawel Szczur
<[email protected]
<mailto:[email protected]>> wrote:
Hi,
I used the config as in the repo.
Please grep the the log for
"hereGoesLongStringID0,2", you will see that
this key is processed multiple times.
This is how I understand CoGroupByKey: one has
two (or more) PCollection<KV<K,?>>. Both sets
are grouped by key. For each unique key a KV<K,
CoGbkResult> is produced, a given CoGbkResult
contains all values from all input PCollections
which have the given key.
But from the log it seems that each key produced
more than one CoGbkResult.
The final counters didn't catch the bug because
in your case, the value from dataset1 was
replicated for each key.
Cheers, Pawel
2016-05-29 15:59 GMT+02:00 Aljoscha Krettek
<[email protected] <mailto:[email protected]>>:
Hi,
I ran your data generator with these configs:
p.apply(Create.of(new Config(3, 5, 600_000, 1)))
.apply(ParDo.of(new Generator())).apply(
AvroIO.Write.to
<http://AvroIO.Write.to>("/tmp/dataset1").withSchema(DumbData.class).withNumShards(6));
p.apply(Create.of(new Config(3, 5, 600_000,
2))).
apply(ParDo.of(new Generator())).apply(
AvroIO.Write.to
<http://AvroIO.Write.to>("/tmp/dataset2").withSchema(DumbData.class).withNumShards(6));
Then I ran the job with parallelism=6. I
couldn't reproduce the problem, this is the
log file from one of several runs:
https://gist.github.com/aljoscha/ef1d804f57671cd472c75b92b4aee51b
Could you please send me the exact config
that you used. Btw, I ran it inside an IDE,
do the problems also occur in the IDE for
you or only when you execute on a cluster?
Cheers,
Aljoscha
On Sun, 29 May 2016 at 01:51 Pawel Szczur
<[email protected]
<mailto:[email protected]>> wrote:
Hi Aljoscha.
I've created a repo with fake dataset to
allow easily reproduce the problem:
https://github.com/orian/cogroup-wrong-grouping
What I noticed: if the dataset is too
small the bug doesn't appear.
You can modify the size of dataset, but
in ideal case it should be few hundred
thousands records per key (I guess it
depends on the machine you run it).
Cheers, Pawel
2016-05-28 12:45 GMT+02:00 Aljoscha
Krettek <[email protected]
<mailto:[email protected]>>:
Hi,
which version of Beam/Flink are you
using.
Could you maybe also provide example
data and code that showcases the
problem? If you have concerns about
sending it to a public list you can
also send it to me directly.
Cheers,
Aljoscha
On Fri, 27 May 2016 at 20:53 Pawel
Szczur <[email protected]
<mailto:[email protected]>> wrote:
*Data description.*
I have two datasets.
Records - the first, containes
around 0.5-1M of records per
(key,day). For testing I use 2-3
keys and 5-10 days of data. What
I shoot for is 1000+ keys. Each
record contains key, timestamp
in μ-seconds and some other data.
Configs - the second, is rather
small. It describes the key in
time, e.g. you can think about
it as a list of tuples: (key,
start date, end date, description).
For the exploration I've encoded
the data as files of
length-prefixed Protocol Buffer
binary encoded messages.
Additionally the files are
packed with gzip. Data is
sharded by date. Each file is
around 10MB.
*Pipeline*
First I add keys to both
datasets. For Records dataset
it's (key, day rounded
timestamp). For Configs a key is
(key, day), where day is each
timestamp value between start
date and end date (pointing
midnight).
The datasets are merged using
CoGroupByKey.
As a key type I use import
org.apache.flink.api.java.tuple.Tuple2
with a Tuple2Coder from this repo.
*The problem*
If the Records dataset is tiny
like 5 days, everything seems
fine (check normal_run.log).
INFO [main]
(FlinkPipelineRunner.java:124) -
Final aggregator values:
INFO [main]
(FlinkPipelineRunner.java:127) -
item count : 4322332
INFO [main]
(FlinkPipelineRunner.java:127) -
missing val1 : 0
INFO [main]
(FlinkPipelineRunner.java:127) -
multiple val1 : 0
When I run the pipeline against
10+ days I encounter an error
pointing that for some Records
there's no Config (wrong_run.log).
INFO [main]
(FlinkPipelineRunner.java:124) -
Final aggregator values:
INFO [main]
(FlinkPipelineRunner.java:127) -
item count : 8577197
INFO [main]
(FlinkPipelineRunner.java:127) -
missing val1 : 6
INFO [main]
(FlinkPipelineRunner.java:127) -
multiple val1 : 0
Then I've added some extra
logging messages:
(ConvertToItem.java:144) - 68643
items for KeyValue3 on:
1462665600000000
(ConvertToItem.java:140) - no
items for KeyValue3 on:
1463184000000000
(ConvertToItem.java:123) -
missing for KeyValue3 on:
1462924800000000
(ConvertToItem.java:142) -
753707 items for KeyValue3 on:
1462924800000000 marked as no-loc
(ConvertToItem.java:123) -
missing for KeyValue3 on:
1462752000000000
(ConvertToItem.java:142) -
749901 items for KeyValue3 on:
1462752000000000 marked as no-loc
(ConvertToItem.java:144) -
754578 items for KeyValue3 on:
1462406400000000
(ConvertToItem.java:144) -
751574 items for KeyValue3 on:
1463011200000000
(ConvertToItem.java:123) -
missing for KeyValue3 on:
1462665600000000
(ConvertToItem.java:142) -
754758 items for KeyValue3 on:
1462665600000000 marked as no-loc
(ConvertToItem.java:123) -
missing for KeyValue3 on:
1463184000000000
(ConvertToItem.java:142) -
694372 items for KeyValue3 on:
1463184000000000 marked as no-loc
You can spot that in first line
68643 items were processed for
KeyValue3 and time 1462665600000000.
Later on in line 9 it seems the
operation processes the same key
again, but it reports that no
Config was available for these
Records.
The line 10 informs they've been
marked as no-loc.
The line 2 is saying that there
were no items for KeyValue3 and
time 1463184000000000, but in
line 11 you can read that the
items for this (key,day) pair
were processed later and they've
lacked a Config.
*Work-around (after more
testing, doesn't work, staying
with Tuple2)*
I've switched from using Tuple2
to a Protocol Buffer message:
message KeyDay {
optional ByteString key = 1;
optional int64 timestamp_usec
= 2;
}
But using Tuple2.of() was just
easier than:
KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build().
// The original description
comes from:
http://stackoverflow.com/questions/37473682/items-not-groupped-correctly-cogroupbykey
--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com