Hey 🙂

  1.  I have 150 partitions in the kafka topic
  2.  I'll check that soon but why doesn't the same happen when I use a smaller 
parallelism? If that was the reason, I'd expect the same behavior also if I had 
a parallelism of 5. How does the increase in parallelism, decrease the 
throughput per slot?
  3.  When I don't use a window function, it handles around 3K+ events per 
second per slot, so that shouldn't be the problem
  4.  Tried this one right now, and the througput remains 600 events per second 
per slot when parallelism is set to 15

Out of all those options, seems like I have to investigate the 2nd one. The key 
is a 2-character string representing a country so I don't think it's very 
probable for 2 different countries to have the same hash, but I know for a fact 
that the number of events is not evenly distributed between countries.

But still, why does the impact in performance appear only for higher 
parallelism?



Sidney Feiner / Data Platform Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]


________________________________
From: Arvid Heise <ar...@ververica.com>
Sent: Tuesday, November 3, 2020 12:09 PM
To: Yangze Guo <karma...@gmail.com>
Cc: Sidney Feiner <sidney.fei...@startapp.com>; user@flink.apache.org 
<user@flink.apache.org>
Subject: Re: Increase in parallelism has very bad impact on performance

Hi Sidney,

there could be a couple of reasons where scaling actually hurts. Let's include 
them one by one.

First, you need to make sure that your source actually supports scaling. Thus, 
your Kafka topic needs at least as many partitions as you want to scale. So if 
you want to scale at some point to 66 parallel instances. Your kafka topic must 
have at least 66 partitions. Ofc, you can also read from less partitions, but 
then some source subtasks are idling. That's valid if your downstream pipeline 
is much more resource intensive. Also note that it's really hard to increase 
the number of Kafka partitions later, so please plan accordingly.

Second, you have a Windowing operation that uses hashes. It's really important 
to check if the hashes are evenly distributed. So you first could have an issue 
that most records share the same key, but you could on top have the issue that 
different keys share the same hash. In these cases, most records are processed 
by the same subtask resulting in poor overall performance. (You can check for 
data skew incl. hash skew in Web UI).

Third, make sure that there is actually enough data to be processed. Does your 
topic contain enough data? If you want to process historic data, did you choose 
the correct consumer setting? Can your Kafka cluster provide enough data to the 
Flink job? If your max data rate is 6k records from Kafka, then ofc the per 
slot throughput decreases on scaling up.

Fourth, if you suspect that the clumping of used slots to one task manager may 
be an issue, try out the configuration cluster-evenly-spread-out-slots [1]. The 
basic idea is to use a TM fully first to allow easier scale-in. However, if for 
some reason your TM is more quickly saturated than it has slots, you may try to 
spread evenly. However, you may also want to check if you declare too many 
slots for each TM (in most cases slots = cores).

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#cluster-evenly-spread-out-slots.


On Tue, Nov 3, 2020 at 4:01 AM Yangze Guo 
<karma...@gmail.com<mailto:karma...@gmail.com>> wrote:
Hi, Sidney,

What is the data generation rate of your Kafka topic? Is it a lot
bigger than 6000?

Best,
Yangze Guo

Best,
Yangze Guo


On Tue, Nov 3, 2020 at 8:45 AM Sidney Feiner 
<sidney.fei...@startapp.com<mailto:sidney.fei...@startapp.com>> wrote:
>
> Hey,
> I'm writing a Flink app that does some transformation on an event consumed 
> from Kafka and then creates time windows keyed by some field, and apply an 
> aggregation on all those events.
> When I run it with parallelism 1, I get a throughput of around 1.6K events 
> per second (so also 1.6K events per slot). With parallelism 5, that goes down 
> to 1.2K events per slot, and when I increase the parallelism to 10, it drops 
> to 600 events per slot.
> Which means that parallelism 5 and parallelism 10, give me the same total 
> throughput (1.2x5 = 600x10).
>
> I noticed that although I have 3 Task Managers, all the all the tasks are run 
> on the same machine, causing it's CPU to spike and probably, this is the 
> reason that the throughput dramatically decreases. After increasing the 
> parallelism to 15 and now tasks run on 2/3 machines, the average throughput 
> per slot is still around 600.
>
> What could cause this dramatic decrease in performance?
>
> Extra info:
>
> Flink version 1.9.2
> Flink High Availability mode
> 3 task managers, 66 slots total
>
>
> Execution plan:
>
>
> Any help would be much appreciated
>
>
> Sidney Feiner / Data Platform Developer
> M: +972.528197720 / Skype: sidney.feiner.startapp
>
>


--

Arvid Heise | Senior Java Developer

[https://lh5.googleusercontent.com/ODbO0aq1IqKMfuoy_pw2YH8r6dqDRTq37rg3ytg11FCGJx12jJ1ff_SANPBxTHzSJTUQY9JLuoXq4NB7Om7j6Vq1lg6jIOKz8S5g2VKDGwicbj5fbY09PVb6mD5TdRuWEUvEMZTG]<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward<https://flink-forward.org/> - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) 
Cheng

Reply via email to