[ 
https://issues.apache.org/jira/browse/FLINK-33668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiang Xin updated FLINK-33668:
------------------------------
    Description: 
With FLINK-30469  and FLINK-31643, we have decoupled the shuffle network memory 
and the parallelism of tasks by limiting the number of buffers for each 
InputGate and ResultPartition in Hybrid Shuffle. However, when too many shuffle 
tasks are running simultaneously on the same TaskManager, "Insufficient number 
of network buffers" errors would still occur. This usually happens when Slot 
Sharing Group is enabled or a TaskManager contains multiple slots.

We want to make sure that the TaskManager does not encounter "Insufficient 
number of network buffers" even if there are dozens of InputGates and 
ResultPartitions running on the same TaskManager simultaneously. I have given 
this some thought, and here is my rough proposal.

1. InputGate or ResultPartition only apply for buffers from LocalBufferPool, 
which means that InputGate will no longer ask for exclusive buffers from 
NetworkBufferPool directly.
2. When creating a LocalBufferPool, we need to specify the maximum, minimum, 
and expected number of buffers. Whenever a new LBP is created or destroyed, a 
redistribution will occur, to adjust the buffer count of all LocalBufferPools 
using the expected value as a weight and between the minimum and maximum 
values. According to the test, the minimum value can be set to 4 to make the 
Flink Job work despite the possibility of lower performance. With this minimum 
value, a task with 20 shuffle edge needs only 5MB memory to avoid insufficient 
network buffers error.
3. During runtime, InputGate and ResultPartition both calculate the number of 
buffers used by their internal data structures based on the pool size of their 
corresponding LocalBufferPool, such as exclusive buffer queue of InputGate and 
BufferAccumulator of ResultPartition.

  was:
With FLINK-30469  and FLINK-31643, we have decoupled the shuffle network memory 
and the parallelism of tasks by limiting the number of buffers for each 
InputGate and ResultPartition in Hybrid Shuffle. However, when too many shuffle 
tasks are running simultaneously on the same TaskManager, "Insufficient number 
of network buffers" errors would still occur. This usually happens when Slot 
Sharing Group is enabled or a TaskManager contains multiple slots.

We want to make sure that the TaskManager does not encounter "Insufficient 
number of network buffers" even if there are dozens of InputGates and 
ResultPartitions running on the same TaskManager simultaneously.


> Decoupling Shuffle network memory and job topology
> --------------------------------------------------
>
>                 Key: FLINK-33668
>                 URL: https://issues.apache.org/jira/browse/FLINK-33668
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Network
>            Reporter: Jiang Xin
>            Priority: Major
>             Fix For: 1.19.0
>
>
> With FLINK-30469  and FLINK-31643, we have decoupled the shuffle network 
> memory and the parallelism of tasks by limiting the number of buffers for 
> each InputGate and ResultPartition in Hybrid Shuffle. However, when too many 
> shuffle tasks are running simultaneously on the same TaskManager, 
> "Insufficient number of network buffers" errors would still occur. This 
> usually happens when Slot Sharing Group is enabled or a TaskManager contains 
> multiple slots.
> We want to make sure that the TaskManager does not encounter "Insufficient 
> number of network buffers" even if there are dozens of InputGates and 
> ResultPartitions running on the same TaskManager simultaneously. I have given 
> this some thought, and here is my rough proposal.
> 1. InputGate or ResultPartition only apply for buffers from LocalBufferPool, 
> which means that InputGate will no longer ask for exclusive buffers from 
> NetworkBufferPool directly.
> 2. When creating a LocalBufferPool, we need to specify the maximum, minimum, 
> and expected number of buffers. Whenever a new LBP is created or destroyed, a 
> redistribution will occur, to adjust the buffer count of all LocalBufferPools 
> using the expected value as a weight and between the minimum and maximum 
> values. According to the test, the minimum value can be set to 4 to make the 
> Flink Job work despite the possibility of lower performance. With this 
> minimum value, a task with 20 shuffle edge needs only 5MB memory to avoid 
> insufficient network buffers error.
> 3. During runtime, InputGate and ResultPartition both calculate the number of 
> buffers used by their internal data structures based on the pool size of 
> their corresponding LocalBufferPool, such as exclusive buffer queue of 
> InputGate and BufferAccumulator of ResultPartition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to