[ 
https://issues.apache.org/jira/browse/FLINK-21884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17454589#comment-17454589
 ] 

ZhuoYu Chen edited comment on FLINK-21884 at 12/7/21, 11:16 AM:
----------------------------------------------------------------

[~trohrmann] 
HDFS cluster with the growth of time, there will inevitably be some 
"performance degradation" of the node, mainly manifested as disk read and write 
slow, network transmission slow, we collectively call these nodes slow nodes. 
When a cluster grows to a certain size, such as a cluster of thousands of 
nodes, slow nodes are usually not easily detected. Most of the time, slow nodes 
are hidden in many healthy nodes, and only when the client frequently accesses 
these problematic nodes and finds that the read/write is slow, will it be 
perceived.

*Network slow monitoring principle*

The principle of monitoring a DN network transmission slowdown is to record the 
data transmission time between each DN in the cluster, find out the abnormal 
value and report it to NN as a slow node, under normal circumstances the 
transmission rate between nodes is basically the same, not too much difference, 
if there is an abnormal transmission time of A to B, A will report B to NN as a 
slow node, NN side will determine whether there is a slow node after 
aggregating all the slow node reports. The NN will determine whether there is a 
slow node after aggregating all the slow node reports. For example, if node X 
is a faulty node, then there must be many nodes reporting to NN that X is a 
slow node, and eventually there are similar slow node reports at the NN side, 
indicating that X is a slow node.

In order to calculate the average elapsed time of data transmission from DN to 
downstream, DN maintains a HashMap<String,Queue<SampleStat>>, the key of which 
is the ip of the downstream DN and the value is a queue of SampleStat objects. 
The number of packets and the total elapsed time are recorded. By default, the 
DN does a snapshot every five minutes to generate a SampleStat, which is used 
to record the network situation of data transmission to the downstream DN in 
these five minutes and stored in the Queue of the HashMap.

The Queue of HashMap is a fixed-size queue with a queue length of 36. If the 
queue is full, it will kick off the first member of the queue to add the new 
SampleStat to the end of the queue. This means that we will only monitor the 
last 3 hours (36 X 5 = 180min) of network transmission, in order to ensure that 
the monitoring data is time-sensitive.


was (Author: monster#12):
[~trohrmann] 
HDFS cluster with the growth of time, there will inevitably be some 
"performance degradation" of the node, mainly manifested as disk read and write 
slow, network transmission slow, we collectively call these nodes slow nodes. 
When a cluster grows to a certain size, such as a cluster of thousands of 
nodes, slow nodes are usually not easily detected. Most of the time, slow nodes 
are hidden in many healthy nodes, and only when the client frequently accesses 
these problematic nodes and finds that the read/write is slow, will it be 
perceived.

The principle of monitoring a DN network transmission slowdown is to record the 
data transmission time between each DN in the cluster, find out the abnormal 
value and report it to NN as a slow node, under normal circumstances the 
transmission rate between nodes is basically the same, not too much difference, 
if there is an abnormal transmission time of A to B, A will report B to NN as a 
slow node, NN side will determine whether there is a slow node after 
aggregating all the slow node reports. The NN will determine whether there is a 
slow node after aggregating all the slow node reports. For example, if node X 
is a faulty node, then there must be many nodes reporting to NN that X is a 
slow node, and eventually there are similar slow node reports at the NN side, 
indicating that X is a slow node.


In order to calculate the average elapsed time of data transmission from DN to 
downstream, DN maintains a HashMap<String,Queue<SampleStat>>, the key of which 
is the ip of the downstream DN and the value is a queue of SampleStat objects. 
The number of packets and the total elapsed time are recorded. By default, the 
DN does a snapshot every five minutes to generate a SampleStat, which is used 
to record the network situation of data transmission to the downstream DN in 
these five minutes and stored in the Queue of the HashMap.

The Queue of HashMap is a fixed-size queue with a queue length of 36. If the 
queue is full, it will kick off the first member of the queue to add the new 
SampleStat to the end of the queue. This means that we will only monitor the 
last 3 hours (36 X 5 = 180min) of network transmission, in order to ensure that 
the monitoring data is time-sensitive.

> Reduce TaskManager failure detection time
> -----------------------------------------
>
>                 Key: FLINK-21884
>                 URL: https://issues.apache.org/jira/browse/FLINK-21884
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Coordination
>    Affects Versions: 1.14.0, 1.13.2
>            Reporter: Robert Metzger
>            Priority: Critical
>              Labels: reactive
>             Fix For: 1.15.0
>
>         Attachments: image-2021-03-19-20-10-40-324.png
>
>
> In Flink 1.13 (and older versions), TaskManager failures stall the processing 
> for a significant amount of time, even though the system gets indications for 
> the failure almost immediately through network connection losses.
> This is due to a high (default) heartbeat timeout of 50 seconds [1] to 
> accommodate for GC pauses, transient network disruptions or generally slow 
> environments (otherwise, we would unregister a healthy TaskManager).
> Such a high timeout can lead to disruptions in the processing (no processing 
> for certain periods, high latencies, buildup of consumer lag etc.). In 
> Reactive Mode (FLINK-10407), the issue surfaces on scale-down events, where 
> the loss of a TaskManager is immediately visible in the logs, but the job is 
> stuck in "FAILING" for quite a while until the TaskManger is really 
> deregistered. (Note that this issue is not that critical in a autoscaling 
> setup, because Flink can control the scale-down events and trigger them 
> proactively)
> On the attached metrics dashboard, one can see that the job has significant 
> throughput drops / consumer lags during scale down (and also CPU usage spikes 
> on processing the queued events, leading to incorrect scale up events again).
>  !image-2021-03-19-20-10-40-324.png|thumbnail!
> One idea to solve this problem is to:
> - Score TaskManagers based on certain signals (# exceptions reported, 
> exception types (connection losses, akka failures), failure frequencies,  
> ...) and blacklist them accordingly.
> - Introduce a best-effort TaskManager unregistration mechanism: When a 
> TaskManager receives a sigterm, it sends a final message to the JobManager 
> saying "goodbye", and the JobManager can immediately remove the TM from its 
> bookkeeping.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#heartbeat-timeout



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to