[GitHub] flink issue #2571: [FLINK-4348] Simplify logic of SlotManager
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2571 Merged --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2571: [FLINK-4348] Simplify logic of SlotManager
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2571 Rebased to latest `flip-6`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2571: [FLINK-4348] Simplify logic of SlotManager
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2571 Thanks for the comments @KurtYoung. I've updated the PR. No worries, four eyes always see more than two and it is natural that ideas progress over time :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2571: [FLINK-4348] Simplify logic of SlotManager
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2571 As discussed in this PR, I've pushed the following changes: 1. Move the slot registration and allocation report to the registration of the TaskExecutor 2. Let the TaskExecutor immediately notify the ResourceManager once a slot becomes free. The ResourceManager has to confirm this notification. Otherwise, the future slot allocations will be blocked because the ResourceManager's state is not in sync. 3. Change the fencing in handleSlotRequestFailedAtTaskManager to protect against TaskExecutors which are not registered anymore. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2571: [FLINK-4348] Simplify logic of SlotManager
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2571 @KurtYoung I see, that makes sense. When the TM looses connection to the RM, it will eventually be detected by the heartbeating and the list of pending slot allocation removal requests will be cleared. TM and RM will re-sync the slot status on reconnecting. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2571: [FLINK-4348] Simplify logic of SlotManager
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/2571 @mxm Regarding the last heartbeat thing, what i really trying to say is, the failing free request will not stuck in the unconfirmed list forever, since we have the heartbeat manager to monitoring the network connection between RM and TM. If the network is really broker, they will treat each other as dead and take some corresponding actions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2571: [FLINK-4348] Simplify logic of SlotManager
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2571 Great to hear that we're on the same page :) >I think it's no need to stick to the failed slot when the allocation fails by rpc. Just put it back to the free pool, and give us another shot. Yes, we can simply trigger processing of pending requests via `handleFreeSlot`. >Actually, i think the pending requests acts like your extra list of unconfirmed requests. (And you pointed at last, we actually dont need this list as TaskManager will correct our faultd by rejecting allocation). I think PendingRequests is not the same because it is a list of outstanding requests but not requests that have been issued to TaskExecutors. But as we found out, we don't need to have a special list for that on the ResourceManager side. >Yes, i also thought this might be a solution. And i think this can work with the Heartbeat manager, since if you cannot send the free message to RM, you will not be able to send heartbeat too. After some timeout, RM will treat the TaskManager as dead, and some garbage collection logic in RM will take care all the allocations and slots which belong to this TaskManager. Are you saying you would rather let the HeartbeatManager send out the removal of slots? That would work but depending on the heartbeat interval this could take slightly longer. Semantically, it doesn't make much difference. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2571: [FLINK-4348] Simplify logic of SlotManager
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/2571 @mxm Thanks for your thoughts, i really like the discussion.:smirk: You just pointed out another point which i missed in the previous reply. Actually, i noticed that right after i posted the previous reply, as it turns out, my solution was exactly the same as you proposed! Some comments inline: > It is guaranteed that the ResourceManager will receive an RPC response of some sort. Either a reply from the TaskExecutor, or a timeout/error which is returned by the future. If the request is then retried, the TaskExecutor receives the same request twice but will simply acknowledge it again I think it's no need to stick to the failed slot when the allocation fails by rpc. Just put it back to the free pool, and give us another shot. Actually, i think the pending requests acts like your extra list of unconfirmed requests. (And you pointed out last, we dont need this list indeed as TaskManager will correct our fault by rejecting allocation). >There is one more problem thought. How to prevent a false request from the ResourceManager to the TaskExecutor in case the ResourceManager hasn't received a reply from the TaskExecutor but the TaskExecutor has already removed the slot again When the allocation fails by rpc and we only have one free slot, it's true that we will keep retrying the same slot and keeping failing by rpc. And actually the task are finished, the slot becomes free again. Then out request reached TaskManager. It's ok for TaskManager to accept the request, at the end, JobManager will reject this allocation, and the slot will become free again. >Again, the only solution for this problem seems to be to keep a list of unconfirmed slot allocation removal requests at the TaskExecutor. Yes, i also thought this might be a solution. And i think this can work with the Heartbeat manager, since if you cannot send the free message to RM, you will not be able to send heartbeat too. After some timeout, RM will treat the TaskManager as dead, and some garbage collection logic in RM will take care all the allocations and slots which belong to this TaskManager. All in all, I think this version is still much simpler than the first one. :smirk: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2571: [FLINK-4348] Simplify logic of SlotManager
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2571 @KurtYoung It is guaranteed that the ResourceManager will receive an RPC response of some sort. Either a reply from the TaskExecutor, or a timeout/error which is returned by the future. If the request is then retried, the TaskExecutor receives the same request twice but will simply acknowledge it again. The ResourceManager just keeps retrying. In the worst case, the TaskExecutor has already freed the slot again because the JobManager doesn't need it anymore. If the TaskExecutor then reports that the slot is available again, we know that we can stop retrying. This requires us to keep an extra list of unconfirmed requests to the TaskExecutor. If the request is still unconfirmed when the slot is free again or occupied by a different allocation, we can cancel the retrying and delete the unconfirmed request. This is slightly more complicated than I initially thought :) There is one more problem thought. How to prevent a false request from the ResourceManager to the TaskExecutor in case the ResourceManager hasn't received a reply from the TaskExecutor but the TaskExecutor has already removed the slot again (i.e. task has finished)? The slot would be allocated although it is not needed anymore. Note that sending back a current allocation list when declining a request does not cover the case in which a slot has already been released again. The TaskExecutor may have tried to decline a request and have failed. In the meantime, the ResourceManager sends the same request again. This results in a second (duplicate) slot allocation. Again, the only solution for this problem seems to be to keep a list of unconfirmed slot allocation removal requests at the TaskExecutor. The ResourceManager has to acknowledge all slot allocation removals. The TaskExecutor can then de-duplicate any requests for slots that it hasn't received a confirmation for its removal message. Actually, it should suffice to have only one list with unconfirmed slot allocation removals at the TaskExecutor. The ResourceManager doesn't need a list to filter because it relies on the TaskExecutor to filter duplicate requests correctly. **TL;DR** I think we need to change the PR title ð Long story short, in addition to the proposed previously discussed changes, we need the ResourceManager to confirm slot allocation removals by the TaskExecutor. The TaskExecutor has to keep around previous allocation ids of freed slots to de-duplicate any old incoming slot requests from the ResourceManager. Thank you so much for your feedback. Please tell me if anything is unclear. You're right that the protocol is quite complex. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2571: [FLINK-4348] Simplify logic of SlotManager
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/2571 @mxm I think that situation will happen when RM request slot from TM, TM accept the request, but the response was somehow missed. Typically, when an rpc error occurred, the send could not know whether the receiver handles that request or not. The changes looks good to me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2571: [FLINK-4348] Simplify logic of SlotManager
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2571 Thanks for the feedback, @beyond1920 and @KurtYoung. You're right, the changes don't allow slots to be released by a TaskExecutor. We can change that by explicitly reporting to the RM if a slot becomes free. This may also decrease latency in case a tasks finishes and new ones are waiting to be deployed. >b. When we handleSlotRequestFailedAtTaskManager, we will make this slot free again. If the slot is occupied by some other task now, we will continuously failed for all allocation on this slot. ( this can be fixed by 3) How can that happen? The slot will not appear free while it is allocated at the TaskExecutor. When allocation fails, it is marked as free and then the request is retried immediately. It must succeed eventually if the initial decision to allocate the slot was correct. However, we need to explicitly check if a TaskExecutor has deregistered, to make sure old TaskExecutors don't send failures which triggers slot allocation of already removed slots (due to TaskExecutor deregistration). That should be fix with this PR. > 1. We can remove the update status part entirely, since it can only do new slot registration now, we can just move it to the task executor first registration. Very good suggestion. Let's move the initial registration and reconciliation of slots to the registration message. To wrap up, let's change the following: 1. Move the slot registration and allocation report to the registration of the TaskExecutor 2. Let the TaskExecutor immediately notify the ResourceManager once a slot becomes free 3. Change the fencing in handleSlotRequestFailedAtTaskManager to protect against TaskExecutors which are not registered anymore. Let me know if that would work for you. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2571: [FLINK-4348] Simplify logic of SlotManager
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/2571 @mxm Thanks for the simplification, i like the idea. When i wrote the first version of the SlotManager, i have noticed maybe i made things too complicated, but i didn't figure out how to make things more simplify. As it turns out, your modification covered two main problems i have faced: 1. What information to exchange during heartbeats, and what actions should we take 2. What action should we take when the allocation failed at TaskManager But what i really want to find out is: In this case, is there a simple paradigm which we can follow to make whole thing clear and robust. What i previous choose is: Take actions based on my newest runtime information. But as you can see, it leads me to a very complex solution, each time i decide what action should be taken, i should to check all related information and consider all possibilities. (even it seems hard to understand why that will happen). Your modification gives me some tips, maybe we can simplify it with following ways: 1. RM and TM only exchange information when needed ( so heartbeat dont sync status ) 2. TM only report informations which it can changed by itself ( like slot be free again ) Here is some thoughts about the modification: 1. We can remove the update status part entirely, since it can only do new slot registration now, we can just move it to the task executor first registration. 2. Once a slot becomes free in TM, notify RM 3. TM should attach the slot usage when rejecting the allocation from RM Here is some minor problems i found in this modification: a. As beyond1920 metioned, we dont have a way to find out a slot becomes free ( this can be done by 2) b. When we handleSlotRequestFailedAtTaskManager, we will make this slot free again. If the slot is occupied by some other task now, we will continuously failed for all allocation on this slot. ( this can be fixed by 3) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2571: [FLINK-4348] Simplify logic of SlotManager
Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/2571 @mxm, What would happen under following cases: taskExecutor releases a registered slot, then taskExecutor reports its latest slotReport to ResourceManager, ResourceManager should remove the old slot allocation from its own view and mark the slot free. So I think we should keep the following code in the old SlotManager `updateSlotStatus`: _else { // slot is reported empty // check whether we also thought this slot is empty if (allocationMap.isAllocated(slotId)) { LOG.info("Slot allocation info mismatch! SlotID:{}, current:{}, reported:null", slotId, allocationMap.getAllocationID(slotId)); // we thought the slot is in use, correct it allocationMap.removeAllocation(slotId); // we have a free slot! handleFreeSlot(slot); } }_ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2571: [FLINK-4348] Simplify logic of SlotManager
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2571 CC @beyond1920 @KurtYoung --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---