Allen Wang created FLINK-32554:
----------------------------------
Summary: Facilitate slot isolation and resource management for
global committer
Key: FLINK-32554
URL: https://issues.apache.org/jira/browse/FLINK-32554
Project: Flink
Issue Type: Improvement
Affects Versions: 1.16.2
Reporter: Allen Wang
Flink's global committer executes unique workload compared to the source and
sink operators. In some use cases, it may require much higher amount of
resources (CPU, memory) than other operators. However, according to this
[source
code|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/StandardSinkTopologies.java],
currently it is not possible to isolate the global committer to a dedicated
task manager or task slot, or assign more resources to it by leveraging the
fine grained resource management. Flink would always make the global committer
task share with another task in a task slot. (In one test, we tried to have one
more task slot than required by the source/sink parallelism, but Flink still
assigns the global committer to share a slot with another task.)
As a result, we often see CPU utilization spike on the task manger that runs
the global committer compared with other task managers and becomes the
bottleneck for the job. Due to slot sharing and inadequate resources on the
global committer, the job takes long time to initialize upon restarting and the
checkpoints take long time to complete. Our job consumes from Kafka and this
bottleneck causes significant increase of consumer lag. The lag in turn causes
the Kafka source operator to replay backlogs, causing more CPU consumption on
the source operator and making it worse for the global committer that runs in
the same task slot.
At minimum, we want the capability to configure the global committer to run in
its own task slot, and make that work under reactive scaling. It would also be
great to make the fine grained resource management working for global committer.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)