Flink will not try to help you do autoscaling and the parallelism is fixed 
unless you enable reactive mode/adaptive scheduler. Max parallelism just means 
the maximum parallelism with which you can rescale your job without losing 
states. The max parallelism limit is related to the Flink key group mechanism 
used to rescale job with keyed states.

I've not sure about the future plan for extending reactive mode beyond 
standalone mode though.

Best,
Zhanghao Chen
________________________________
From: Erez Yaakov <erez.yaa...@niceactimize.com>
Sent: Wednesday, September 14, 2022 20:32
To: zhanghao.c...@outlook.com <zhanghao.c...@outlook.com>; 
user@flink.apache.org <user@flink.apache.org>
Subject: RE: Can flink dynamically detect high load and increase the job 
parallelism automatically?


Maybe 'automatically parallelism change' is a not accurate term for describing 
what I mean, so let me re-phrase it:



Assuming I'm submitting my job with parallelism = 2 and max parallelism  = 128 
(default). My expectation is that any instance of the job will actually have 
several instances at run time, from 2 to 128.

Meaning, Flink starts my job with 2 instances of every operator, but once it 
realize that specific operator becomes the bottleneck, it'll create additional 
instances of this operator, until it'll get to the max of 128. For example, in 
my case the kafka topic is partitioned to 16 partitions. Once I increase the 
load and consumers cannot handle that (but the downstream operators do!), I’d 
expect that Flink will increase the actual amount of parallel instances of the 
source operator and if new TaskManager pods are required, flink will interact 
directly with k8s (I'm working on k8s native deployment) in order to spin up 
additional pods.



Is it the expected behavior from k8s native deployment? Or is it that in this 
mode the number of actual parallel operator instances is fixed per job and is 
not changed dynamically during the job life?



Thanks for your clarifications. It's really helpful!





From: zhanghao.c...@outlook.com <zhanghao.c...@outlook.com>
Sent: Tuesday, September 13, 2022 4:30 AM
To: Erez Yaakov <erez.yaa...@niceactimize.com>; user@flink.apache.org
Subject: Re: Can flink dynamically detect high load and increase the job 
parallelism automatically?



EXTERNAL EMAIL



Hi Erez,



Unfortunately, autoscaling for streaming jobs is only available with reactive 
mode, which as you've already pointed out, is an MVP feature yet and only 
supports Standalone mode. Some vendors (e.g. Ververica) have already shipped 
their own private implementations of Flink autoscaling though.



Best,

Zhanghao Chen

________________________________

From: Erez Yaakov 
<erez.yaa...@niceactimize.com<mailto:erez.yaa...@niceactimize.com>>
Sent: Monday, September 12, 2022 21:38
To: user@flink.apache.org<mailto:user@flink.apache.org> 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Can flink dynamically detect high load and increase the job 
parallelism automatically?



Hi,



When running a streaming job that uses a kafka source, is it possible (without 
reactive mode) for flink to dynamically detect high load (high consumers lag, 
high cpu usage…) and increase the job parallelism automatically?



I am running flink streaming job on an application mode cluster using native 
k8s.

My streaming job is consuming messages from a kafka topic with 16 partitions, 
parallelism.default is set to 2, no parallelism is set specifically on the 
operators/sources/sinks.



I tried to send multiple message to the kafka topic at high rate, faster than 
the job can consume, and I saw that the consumer lag was increasing.  I also 
saw in the flink UI that the source task was turning red, indicating a high 
usage of this task.

Even though I created a high load on the job, I didn't see that flink 
automatically changes the parallelism of the job to handle the high load.

Is possible for Flink to increase the parallelism of my job (or of my source) 
dynamically based on the current load (and add task managers automatically)? Or 
is this behavior only available by using reactive mode?



For reactive mode, my understanding based on the documentation is that it is in 
MVP state and is only supported in standalone mode, and is not ready yet for 
production use.



Thanks,

Erez

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.

Reply via email to