Hi Sidney,

you might recheck your first message. Either it's incorrectly written or
you are a victim of a fallacy.

With 1 slot, you have 1.6K events per slot = 1.6K overall.
With parallelism 5, you have 1.2K events per slot, so 6K overall. That's a
decent speedup.
With 10, you still have 6K overall.

So you haven't experienced any performance degradation (what your title
suggests). It's rather that you hit a practical scale-up/out boundary.

Now of course, you'd like to see your system to scale beyond that 6K into
the realm of 45k per second and I can assure you that it's well possible in
your setup. However, we need to figure out why it's not doing it.

The most likely reason that would explain the behavior is indeed data skew.
Your observation also matches it: even though you distribute your job, some
slots are doing much more work than other slots.

So the first thing that you should do is to plot a histogram over country
codes. What you will likely see is that 20% of all records belong to the
same country (US?). That's where your practical scale-up boundary comes
from. Since you group by country, there is no way to calculate it in a
distributed manner. Also check in Flink Web UI which tasks bottlenecks. I'm
assuming it's the window operator (or rather everything after HASH) for now.

Btw concerning hash collisions: just because you have in theory some
26^2=676 combinations in a 2-letter ASCII string, you have <200 countries =
unique values. Moreover, two values with the same hash is very common as
the hash is remapped to your parallelism. So if your parallelism is 5, you
have only 5 hash buckets where you need to put in 40 countries on average.
Let's assume you have US, CN, UK as your countries with most entries and a
good hash function remapped to 5 buckets, then you have 4% probability of
having them all assigned to the same bucket, but almost 60% of two of them
being in the same bucket.

Nevertheless, even without collisions your scalability is limited by the
largest country. That's independent of the used system and inherent to your
query. So if you indeed see this data skew, then the best way is to modify
the query. Possible options:
- You use a more fine-grain key (country + state). That may not be possible
due to semantics.
- You use multiple aggregation steps (country + state), then country.
Preaggregations are always good to have.
- You can reduce data volume by filtering before HASH. (You already have a
filter, so I'm guessing it's not a valid option)
- You preaggregate per Kafka partition key before HASH.

If you absolutely cannot make the aggregations more fine-grain, you need to
use machines that have strong CPU slots. (it's also no use to go beyond
parallelism of 10)

I also noticed that you have several forward channels. There is usually no
need for them. Task chaining is much faster. Especially if you
enableObjectReuse [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html


On Tue, Nov 3, 2020 at 3:14 PM Sidney Feiner <sidney.fei...@startapp.com>
wrote:

> Hey 🙂
>
>
>    1. I have 150 partitions in the kafka topic
>    2. I'll check that soon but why doesn't the same happen when I use a
>    smaller parallelism? If that was the reason, I'd expect the same behavior
>    also if I had a parallelism of 5. How does the increase in parallelism,
>    decrease the throughput per slot?
>    3. When I don't use a window function, it handles around 3K+ events
>    per second per slot, so that shouldn't be the problem
>    4. Tried this one right now, and the througput remains 600 events per
>    second per slot when parallelism is set to 15
>
>
> Out of all those options, seems like I have to investigate the 2nd one.
> The key is a 2-character string representing a country so I don't think
> it's very probable for 2 different countries to have the same hash, but I
> know for a fact that the number of events is not evenly distributed between
> countries.
>
> But still, why does the impact in performance appear only for higher
> parallelism?
>
>
> *Sidney Feiner* */* Data Platform Developer
> M: +972.528197720 */* Skype: sidney.feiner.startapp
>
> [image: emailsignature]
>
> ------------------------------
> *From:* Arvid Heise <ar...@ververica.com>
> *Sent:* Tuesday, November 3, 2020 12:09 PM
> *To:* Yangze Guo <karma...@gmail.com>
> *Cc:* Sidney Feiner <sidney.fei...@startapp.com>; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Increase in parallelism has very bad impact on performance
>
> Hi Sidney,
>
> there could be a couple of reasons where scaling actually hurts. Let's
> include them one by one.
>
> First, you need to make sure that your source actually supports scaling.
> Thus, your Kafka topic needs at least as many partitions as you want to
> scale. So if you want to scale at some point to 66 parallel instances. Your
> kafka topic must have at least 66 partitions. Ofc, you can also read from
> less partitions, but then some source subtasks are idling. That's valid if
> your downstream pipeline is much more resource intensive. Also note that
> it's really hard to increase the number of Kafka partitions later, so
> please plan accordingly.
>
> Second, you have a Windowing operation that uses hashes. It's really
> important to check if the hashes are evenly distributed. So you first could
> have an issue that most records share the same key, but you could on top
> have the issue that different keys share the same hash. In these cases,
> most records are processed by the same subtask resulting in poor overall
> performance. (You can check for data skew incl. hash skew in Web UI).
>
> Third, make sure that there is actually enough data to be processed. Does
> your topic contain enough data? If you want to process historic data, did
> you choose the correct consumer setting? Can your Kafka cluster provide
> enough data to the Flink job? If your max data rate is 6k records from
> Kafka, then ofc the per slot throughput decreases on scaling up.
>
> Fourth, if you suspect that the clumping of used slots to one task manager
> may be an issue, try out the configuration cluster-evenly-spread-out-slots
> [1]. The basic idea is to use a TM fully first to allow easier scale-in.
> However, if for some reason your TM is more quickly saturated than it has
> slots, you may try to spread evenly. However, you may also want to check if
> you declare too many slots for each TM (in most cases slots = cores).
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#cluster-evenly-spread-out-slots
> .
>
>
> On Tue, Nov 3, 2020 at 4:01 AM Yangze Guo <karma...@gmail.com> wrote:
>
> Hi, Sidney,
>
> What is the data generation rate of your Kafka topic? Is it a lot
> bigger than 6000?
>
> Best,
> Yangze Guo
>
> Best,
> Yangze Guo
>
>
> On Tue, Nov 3, 2020 at 8:45 AM Sidney Feiner <sidney.fei...@startapp.com>
> wrote:
> >
> > Hey,
> > I'm writing a Flink app that does some transformation on an event
> consumed from Kafka and then creates time windows keyed by some field, and
> apply an aggregation on all those events.
> > When I run it with parallelism 1, I get a throughput of around 1.6K
> events per second (so also 1.6K events per slot). With parallelism 5, that
> goes down to 1.2K events per slot, and when I increase the parallelism to
> 10, it drops to 600 events per slot.
> > Which means that parallelism 5 and parallelism 10, give me the same
> total throughput (1.2x5 = 600x10).
> >
> > I noticed that although I have 3 Task Managers, all the all the tasks
> are run on the same machine, causing it's CPU to spike and probably, this
> is the reason that the throughput dramatically decreases. After increasing
> the parallelism to 15 and now tasks run on 2/3 machines, the average
> throughput per slot is still around 600.
> >
> > What could cause this dramatic decrease in performance?
> >
> > Extra info:
> >
> > Flink version 1.9.2
> > Flink High Availability mode
> > 3 task managers, 66 slots total
> >
> >
> > Execution plan:
> >
> >
> > Any help would be much appreciated
> >
> >
> > Sidney Feiner / Data Platform Developer
> > M: +972.528197720 / Skype: sidney.feiner.startapp
> >
> >
>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to