Hi KristoffSC,

As Zhu Zhu explained, Flink does not currently auto-scale a Job as new
resources become available. Instead the Job must be stopped via a savepoint
and restarted with a new parallelism (the old rescale CLI experiment use to
perform this).

Making Flink reactive to new resources and auto scaling jobs is something
I'm currently very interested in. An approach on how to change Flink to
support this has been previously outlined/discussed in FLINK-10407 (
https://issues.apache.org/jira/browse/FLINK-10407)

/David/

On Thu, Jan 9, 2020 at 7:38 AM Zhu Zhu <reed...@gmail.com> wrote:

> Hi KristoffSC,
>
> Each task needs a slot to run. However, Flink enables slot sharing[1] by
> default so that one slot can host one parallel instance of each task in a
> job. That's why your job can start with 6 slots.
> However, different parallel instances of the same task cannot share a
> slot. That's why you need at least 6 slots to run your job.
>
> You can set tasks to be in different slot sharing group via
> '.slotSharingGroup(xxx)' to force certain tasks to not share slots. This
> allows the tasks to not burden each other. However, in this way the job
> will need more slots to start.
>
> So for your questions:
> #1 yes
> #2 ATM, you will need to resubmit your job with the adjusted parallelism.
> The rescale cli was experimental and was temporarily removed [2]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#task-slots-and-resources
> [2]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-Temporarily-remove-support-for-job-rescaling-via-CLI-action-quot-modify-quot-td27447.html
>
> Thanks,
> Zhu Zhu
>
> KristoffSC <krzysiek.chmielew...@gmail.com> 于2020年1月9日周四 上午1:05写道:
>
>> Hi all,
>> I must say I'm very impressed by Flink and what it can do.
>>
>> I was trying to play around with Flink operator parallelism and
>> scalability
>> and I have few questions regarding this subject.
>>
>> My setup is:
>> 1. Flink 1.9.1
>> 2. Docker Job Cluster, where each Task manager has only one task slot. I'm
>> following [1]
>> 3. env setup:
>> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000));
>>         env.setParallelism(1);
>>         env.setMaxParallelism(128);
>>         env.enableCheckpointing(10 * 60 * 1000);
>>
>> Please mind that I am using operator chaining here.
>>
>> My pipeline setup:
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture2.png>
>>
>>
>>
>> As you can see I have 7 operators (few of them were actually chained and
>> this is ok), with different parallelism level. This all gives me 23 tasks
>> total.
>>
>>
>> I've noticed that with "one task manager = one task slot" approach I have
>> to
>> have 6 task slots/task managers to be able to start this pipeline.
>>
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture1.png>
>>
>>
>> If number of task slots is lower than 6, job is scheduled but not
>> started.
>>
>> With 6 task slots everything is working fine and I've must say that I'm
>> very
>> impressed with a way that Flinks balanced data between task slots. Data
>> was
>> distributed very evenly between operator instances/tasks.
>>
>> In this setup (7 operators, 23 tasks and 6 task slots), some task slots
>> have
>> to be reused by more than one operator. While inspecting UI I've found
>> examples such operators. This is what I was expecting though.
>>
>> However I was surprised a little bit after I added one additional task
>> manager (hence one new task slot)
>>
>> <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture3.png>
>>
>>
>> After adding new resources, Flink did not re balanced/redistributed the
>> graph. So this host was sitting there and doing nothing. Even after
>> putting
>> some load on the cluster, still this node was not used.
>>
>>
>> *After doing this exercise I have few questions:*
>>
>> 1. It seems that number of task slots must be equal or greater than max
>> number of parallelism used in the pipeline. In my case it was 6. When I
>> changed parallelism for one of the operator to 7, I had to have 7 task
>> slots
>> (task managers in my setup) to be able to even start the job.
>> Is this the case?
>>
>> 2. What I can do to use the extra node that was spanned while job was
>> running?
>> In other words, If I would see that one of my nodes has to much load what
>> I
>> can do? Please mind that I'm using keyBy/hashing function in my pipeline
>> and
>> in my tests I had around 5000 unique keys.
>>
>> I've try to use REST API to call "rescale" but I got this response:
>> /302{"errors":["Rescaling is temporarily disabled. See FLINK-12312."]}/
>>
>> Thanks.
>>
>> [1]
>>
>> https://github.com/apache/flink/blob/release-1.9/flink-container/docker/README.md
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>

Reply via email to