[ 
https://issues.apache.org/jira/browse/FLINK-18229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17352236#comment-17352236
 ] 

Xintong Song edited comment on FLINK-18229 at 5/27/21, 2:18 AM:
----------------------------------------------------------------

Hi [~nferrario],

Thanks for sharing the information about your problem.

AFAIK, this ticket is no longer blocked on any other efforts. We just haven't 
find enough manpower for this. I'll try to find time for this in the 1.14 
release cycle.

Meantime, the reason Flink requested 6 more TMs when there's only 1 TM lost is 
as follow.
- There's an idle timeout in JobMaster, that if a slot is not being used for a 
certain time, it will be released.
- Thus, if the job failover takes long, some of the slots might be released by 
the JobMaster, and be re-requested very soon.
- There's a chance that the ResourceManager receives slot requests from 
JobMaster before learning that the previous slots are released from a TM 
heartbeat, causing more TMs than needed being requested.

To workaround this, you can try the following.
- Increase 'slot.idle.timeout'. This reduce the chance that JobMaster release 
slots during job failover. The side effect is that, if there are multiple jobs 
sharing the same Flink session, it might take longer for a slot released by one 
job being reused by another.
- For Flink 1.12, you can set 'cluster.declarative-resource-management.enabled' 
to 'true'. That makes JobMaster, instead of requesting individual slots, 
declare its total resource requirements to ResourceManager at once. That should 
help avoid requesting more resources during failover.

Notice that the above workarounds only apply to the cases where the JobManager 
process is not crashed. In case of JM pod failure, Flink can still request more 
than needed resources. Thus, this ticket is still valid.


was (Author: xintongsong):
Hi [~nferrario],

Thanks for sharing the information about your problem.

AFAIK, this ticket is no longer blocked on any other efforts. We just haven't 
find enough manpower for this. I'll try to find time for this in the 1.14 
release cycle.

Meantime, the reason Flink requested 6 more TMs when there's only 1 TM lost is 
as follow.
- There's an idle timeout in JobMaster, that if a slot is not being used for a 
certain time, it will be released.
- Thus, if the job failover takes long, some of the slots might be released by 
the JobMaster, and be re-requested very soon.
- There's a chance the the ResourceManager receives slot requests from 
JobMaster before learning the the previous slots are released from a TM 
heartbeat, causing more TMs than needed being requested.

To workaround this, you can try the following.
- Increase 'slot.idle.timeout'. This reduce the chance that JobMaster release 
slots during job failover. The side effect is that, if there are multiple jobs 
sharing the same Flink session, it might take longer for a slot released by one 
job being reused by another.
- For Flink 1.12, you can set 'cluster.declarative-resource-management.enabled' 
to 'true'. That makes JobMaster, instead of requesting individual slots, 
declare its total resource requirements to ResourceManager at once. That should 
help avoid requesting more resources during failover.

Notice that the above workarounds only apply to the cases where the JobManager 
process is not crashed. In case of JM pod failure, Flink can still request more 
than needed resources. Thus, this ticket is still valid.

> Pending worker requests should be properly cleared
> --------------------------------------------------
>
>                 Key: FLINK-18229
>                 URL: https://issues.apache.org/jira/browse/FLINK-18229
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Deployment / Kubernetes, Deployment / YARN, Runtime / 
> Coordination
>    Affects Versions: 1.9.3, 1.10.1, 1.11.0
>            Reporter: Xintong Song
>            Priority: Major
>             Fix For: 1.14.0
>
>
> Currently, if Kubernetes/Yarn does not have enough resources to fulfill 
> Flink's resource requirement, there will be pending pod/container requests on 
> Kubernetes/Yarn. These pending resource requirements are never cleared until 
> either fulfilled or the Flink cluster is shutdown.
> However, sometimes Flink no longer needs the pending resources. E.g., the 
> slot request is then fulfilled by another slots that become available, or the 
> job failed due to slot request timeout (in a session cluster). In such cases, 
> Flink does not remove the resource request until the resource is allocated, 
> then it discovers that it no longer needs the allocated resource and release 
> them. This would affect the underlying Kubernetes/Yarn cluster, especially 
> when the cluster is under heavy workload.
> It would be good for Flink to cancel pod/container requests as earlier as 
> possible if it can discover that some of the pending workers are no longer 
> needed.
> There are several approaches potentially achieve this.
>  # We can always check whether there's a pending worker that can be canceled 
> when a \{{PendingTaskManagerSlot}} is unassigned.
>  # We can have a separate timeout for requesting new worker. If the resource 
> cannot be allocated within the given time since requested, we should cancel 
> that resource request and claim a resource allocation failure.
>  # We can share the same timeout for starting new worker (proposed in 
> FLINK-13554). This is similar to 2), but it requires the worker to be 
> registered, rather than allocated, before timeout.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to