[ 
https://issues.apache.org/jira/browse/FLINK-22643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17467174#comment-17467174
 ] 

fanrui edited comment on FLINK-22643 at 12/31/21, 10:22 AM:
------------------------------------------------------------

[~kevin.cyj] [~pnowojski] Of course, I can do it. So, do I need to run some 
benchmarks with different settings (1, 5, Integer.MAX_VALUE) to select the 
default value?

My test job is 
[https://github.com/1996fanrui/fanrui-learning/commit/1fcb01a50d629a8a28018d4181bdf435ab400eb1]

 

For scenarios with fewer tasks, this config may have no effect, the number of 
tasks of the test job <3 is meaningless. So my test task looks like this:

Number of Tasks == 10
parallelism = 10

When configuring 1,5, Integer.MAX_VALUE, compare TM's CPU resource usage and 
numRecordsInPerSecond metric.

The number of Tasks and parallelism can also set other values, such as 
(20,10),(30,10), etc.

Can master provide some suggestions?


was (Author: fanrui):
[~kevin.cyj] [~pnowojski] Of course, I can do it. So, do I need to run some 
benchmarks with different settings (1, 5, Integer.MAX_VALUE) to select the 
default value?

My test job is 
[https://github.com/1996fanrui/fanrui-learning/commit/fe038478b1250c97724c60bd7bc48a465fb9550a]

For scenarios with fewer tasks, this config may have no effect, the number of 
tasks of the test job <3 is meaningless. So my test task looks like this:

Number of Tasks == 10
parallelism = 10

When configuring 1,5, Integer.MAX_VALUE, compare TM's CPU resource usage and 
numRecordsInPerSecond metric.

The number of Tasks and parallelism can also set other values, such as 
(20,10),(30,10), etc.

Can master provide some suggestions?

 

 

> Too many TCP connections among TaskManagers for large scale jobs
> ----------------------------------------------------------------
>
>                 Key: FLINK-22643
>                 URL: https://issues.apache.org/jira/browse/FLINK-22643
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Network
>    Affects Versions: 1.14.0, 1.13.2
>            Reporter: Zhilong Hong
>            Assignee: fanrui
>            Priority: Minor
>              Labels: auto-deprioritized-major
>
> For the large scale jobs, there will be too many TCP connections among 
> TaskManagers. Let's take an example.
> For a streaming job with 20 JobVertices, each JobVertex has 500 parallelism. 
> We divide the vertices into 5 slot sharing groups. Each TaskManager has 5 
> slots. Thus there will be 400 taskmanagers in this job. Let's assume that job 
> runs on a cluster with 20 machines.
> If all the job edges are all-to-all edges, there will be 19 * 20 * 399 * 2 = 
> 303,240 TCP connections for each machine. If we run several jobs on this 
> cluster, the TCP connections may exceed the maximum limit of linux, which is 
> 1,048,576. This will stop the TaskManagers from creating new TCP connections 
> and cause task failovers.
> As we run our production jobs on a K8S cluster, the job always failover due 
> to exceptions related to network, such as {{Sending the partition request to 
> 'null' failed}}, and etc.
> We think that we can decrease the number of connections by letting tasks 
> reuse the same connection. We implemented a POC that makes all tasks on the 
> same TaskManager reuse one TCP connection. For the example job we mentioned 
> above, the number of connections will decrease from 303,240 to 15960. With 
> the POC, the frequency of meeting exceptions related to network in our 
> production jobs drops significantly.
> The POC is illustrated in: 
> https://github.com/wsry/flink/commit/bf1c09e80450f40d018a1d1d4fe3dfd2de777fdc
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to