Hi,
I’m trying to understand the performance impact of using GroupByKey in a
pipeline running with flink runner.
I compared two simple pipelines:
Pipeline1: read from kafka -> DoFn { once in a second write to log the number
of arrived messages }
Pipeline2: read from kafka -> global window + GroupByKey -> DoFn { once in a
second write to log the number of arrived messages }
This is the GroupByKey command:
PCollection<KV<String, Iterable<MyClass>>> shuffledInput = origInput
.apply("groupByKey", Window.<KV<String, MyClass>>into(new
GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.ZERO) //late messages are
dropped
.discardingFiredPanes())
.apply(GroupByKey.create());
Configuration:
Parallelism=3, 1 task slot per task manager (results in 3 task managers)
fasterCopy=true
metrics are disabled
The input kafka topic has 3 partitions
The results: In pipeline1, each task manager processed 10x more messages per
second than in pipeline2 (450K vs. 45K). In flink UI I saw that in pipeline2,
all 3 task managers were reported as 100% Busy on the GroupByKey stage.
I tried running the pipelines with 3 keys, 1000 keys and 100,000 keys. I also
tried different triggering options for the window. The performance results were
similar.
Any idea what could cause the performance difference? Am I missing something?
Thanks,
Ifat