Github user mxm commented on the issue:
https://github.com/apache/flink/pull/2571
@KurtYoung It is guaranteed that the ResourceManager will receive an RPC
response of some sort. Either a reply from the TaskExecutor, or a timeout/error
which is returned by the future. If the request is then retried, the
TaskExecutor receives the same request twice but will simply acknowledge it
again. The ResourceManager just keeps retrying. In the worst case, the
TaskExecutor has already freed the slot again because the JobManager doesn't
need it anymore. If the TaskExecutor then reports that the slot is available
again, we know that we can stop retrying.
This requires us to keep an extra list of unconfirmed requests to the
TaskExecutor. If the request is still unconfirmed when the slot is free again
or occupied by a different allocation, we can cancel the retrying and delete
the unconfirmed request. This is slightly more complicated than I initially
thought :)
There is one more problem thought. How to prevent a false request from the
ResourceManager to the TaskExecutor in case the ResourceManager hasn't received
a reply from the TaskExecutor but the TaskExecutor has already removed the slot
again (i.e. task has finished)? The slot would be allocated although it is not
needed anymore.
Note that sending back a current allocation list when declining a request
does not cover the case in which a slot has already been released again. The
TaskExecutor may have tried to decline a request and have failed. In the
meantime, the ResourceManager sends the same request again. This results in a
second (duplicate) slot allocation.
Again, the only solution for this problem seems to be to keep a list of
unconfirmed slot allocation removal requests at the TaskExecutor. The
ResourceManager has to acknowledge all slot allocation removals. The
TaskExecutor can then de-duplicate any requests for slots that it hasn't
received a confirmation for its removal message.
Actually, it should suffice to have only one list with unconfirmed slot
allocation removals at the TaskExecutor. The ResourceManager doesn't need a
list to filter because it relies on the TaskExecutor to filter duplicate
requests correctly.
**TL;DR**
I think we need to change the PR title ð Long story short, in addition
to the proposed previously discussed changes, we need the ResourceManager to
confirm slot allocation removals by the TaskExecutor. The TaskExecutor has to
keep around previous allocation ids of freed slots to de-duplicate any old
incoming slot requests from the ResourceManager.
Thank you so much for your feedback. Please tell me if anything is unclear.
You're right that the protocol is quite complex.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---