Github user mxm commented on the issue:

    https://github.com/apache/flink/pull/2571
  
    @KurtYoung It is guaranteed that the ResourceManager will receive an RPC 
response of some sort. Either a reply from the TaskExecutor, or a timeout/error 
which is returned by the future. If the request is then retried, the 
TaskExecutor receives the same request twice but will simply acknowledge it 
again. The ResourceManager just keeps retrying. In the worst case, the 
TaskExecutor has already freed the slot again because the JobManager doesn't 
need it anymore. If the TaskExecutor then reports that the slot is available 
again, we know that we can stop retrying.
    
    This requires us to keep an extra list of unconfirmed requests to the 
TaskExecutor. If the request is still unconfirmed when the slot is free again 
or occupied by a different allocation, we can cancel the retrying and delete 
the unconfirmed request. This is slightly more complicated than I initially 
thought :) 
    
    There is one more problem thought. How to prevent a false request from the 
ResourceManager to the TaskExecutor in case the ResourceManager hasn't received 
a reply from the TaskExecutor but the TaskExecutor has already removed the slot 
again (i.e. task has finished)? The slot would be allocated although it is not 
needed anymore.
    
    Note that sending back a current allocation list when declining a request 
does not cover the case in which a slot has already been released again. The 
TaskExecutor may have tried to decline a request and have failed. In the 
meantime, the ResourceManager sends the same request again. This results in a 
second (duplicate) slot allocation.
    
    Again, the only solution for this problem seems to be to keep a list of 
unconfirmed slot allocation removal requests at the TaskExecutor. The 
ResourceManager has to acknowledge all slot allocation removals. The 
TaskExecutor can then de-duplicate any requests for slots that it hasn't 
received a confirmation for its removal message.
    
    Actually, it should suffice to have only one list with unconfirmed slot 
allocation removals at the TaskExecutor. The ResourceManager doesn't need a 
list to filter because it relies on the TaskExecutor to filter duplicate 
requests correctly.
    
    **TL;DR**
    I think we need to change the PR title 😏 Long story short, in addition 
to the proposed previously discussed changes, we need the ResourceManager to 
confirm slot allocation removals by the TaskExecutor. The TaskExecutor has to 
keep around previous allocation ids of freed slots to de-duplicate any old 
incoming slot requests from the ResourceManager.
    
    Thank you so much for your feedback. Please tell me if anything is unclear. 
You're right that the protocol is quite complex.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to