Hwanju Kim created FLINK-14589:
----------------------------------
Summary: Redundant slot requests with the same AllocationID leads
to inconsistent slot table
Key: FLINK-14589
URL: https://issues.apache.org/jira/browse/FLINK-14589
Project: Flink
Issue Type: Bug
Components: Runtime / Coordination
Affects Versions: 1.6.3
Reporter: Hwanju Kim
_NOTE: We found this issue in 1.6.2, but I checked the relevant code is still
in the mainline. What I am not sure, however, is what other slot-related fixes
after 1.6.2 (such as FLINK-11059 and FLINK-12863, etc) would prevent the
initial cause of this issue from happening. So far, I have not found the
related fix to the issue I am describing here, so opening this issue. Please
feel free to deduplicate this if another one already covers it. Please note
that we have already picked FLINK-9912, which turned out to be a major fix to
slot allocation failure issue. I will note the ramification to that issue just
in case others experience the same problem)._
h2. Summary
When *requestSlot* is called from ResourceManager (RM) to TaskManager (TM), TM
firstly reserves the requested slot marking it as ALLOCATED, offers the slot to
JM, and marks the slot as ACTIVE once getting acknowledgement from JM. This
three-way communication for slot allocation is identified by AllocationID,
which is generated by JM initially. The way TM reserves a slot is by calling
*TaskSlotTable.allocateSlot* if the requested slot number (i.e., slot index) is
free to use. The major data structure is *TaskSlot* indexed by slot index. Once
the slot is marked as ALLOCATED with a given AllocationID, it tries to update
other maps such as *allocationIDTaskSlotMap* keyed by AllocationID and
*slotsPerJob* keyed by JobID. Here when updating *allocationIDTaskSlotMap*,
it's directly using *allocationIDTaskSlotMap.put(allocationId, taskSlot)*,
which may overwrite existing entry, if one is already there with the same
AllocationID. This would render inconsistency between *TaskSlot* and
*allocationIDTaskSlotMap*, where the former says two slots are allocated by the
same AllocationID and the latter says the AllocationID only has the latest task
slot. With this state, once the slot is freed, *freeSlot* is driven by
AllocationID, so it fetches slot index (i.e., the latter one that has arrived
later) from *allocationIDTaskSlotMap*, marks the slot free, and removes it from
*allocationIDTaskSlotMap*. But still the old task slot is marked as allocated.
This old task slot becomes zombie and can never be freed. This can cause
permanent slot allocation failure if TM slots are statically and tightly
provisioned and resource manager is not actively spawning new TMs where
unavailable (e.g., Kubernetes without active mode integration, which is not yet
available).
h2. Scenario
>From my observation, the redundant slot requests with the same AllocationID
>and different slot indices should be rare but can happen with race condition
>especially when repeated fail-over and heartbeat timeout (primarily caused by
>transient resource overload, not permanent network partition/node outage) are
>taking place. The following is a detailed scenario, which could lead to this
>issue (AID is AllocationID):
# AID1 is requested from JM and put in the pending request queue in RM.
# RM picks up slot number 1 (Slot1) from freeSlots and performs requestSlot
with Slot1 and AID1. Here this slot request is on the fly.
# In the meantime, Slot1 is occupied by AID2 in TM for a delayed slot request
and TM sends slot report via heartbeat to RM saying Slot1 is already allocated
with AID2.
# RM's heartbeat handler identifies that Slot1 is occupied with a different
AID (AID2) so that it should reject the pending request sent from step 2.
# handleFailedSlotRequest puts the rejected AID1 to pending request again by
retrying the slot request. Now it picks up another available slot, say Slot2.
So, the retried slot request with Slot 2 and AID1 is on the fly.
# In the meantime, Slot1 occupied by AID2 is freed (by any disconnection with
JM, or releasing all the tasks in the slot on cancellation/failure - the latter
was observed).
# The in-flight slot request (Slot1, AID1) from step 2 arrives at TM, and it's
succeeded as Slot1 is free to allocate. TM offers the Slot1 to JM, which
acknowledges it so that TM marks Slot1 ACTIVE with AID1. As this point,
allocationIDTaskSlotMap[AID1] = Slot1 in TM. JM's allocatedSlots[AID1] = Slot1.
# The next in-flight slot request (Slot2, AID1) from step 5 arrives at TM. As
Slot2 is still free, TM marks it ALLOCATED and offers Slot2 to JM and
*"overwrite allocationIDTaskSlotMap[AID1] to Slot2"*
# In step 7, JM has allocatedSlots[AID1] = Slot1, which leads JM to reject the
offer as the same AID is already occupied by another slot.
# TM gets the rejected offer for (Slot2, AID1) and frees Slot2. As part of
that, it removes allocationIDTaskSlotMap[AID1]. *Here Slot1 is still marked as
ALLOCATED with AID1 but allocationIDTaskSlotMap contains nothing for AID1.*
# From this point on, RM believes that Slot1 is allocated for AID1, so is JM,
proceeding task deployment with AID1. In TM, AID1 is not allocated at all due
to allocationIDTaskSlotMap[AID1] = null. Task deployment is failed with
*TaskSubmissionException("No task slot allocated for job ID")*.
# Any slot release from JM (by another heartbeat timeout) removes the
allocated slot (Slot1, AID1) from allocatedSlots and availableSlots, where
freeing slots with AID1 in TM is no-op due to allocationIDTaskSlotMap[AID1] =
null.
# Any further scheduling is failed with *NoResourceAvailableException("Could
not allocate all requires slots within timeout")*, unless active resource
manager is used.
h2. Repro method
The repro I have is a little stressed way than deterministic one, by having
constantly failing app (also with not reacting to cancellation to prolong
fail-over), with short heartbeat timeout (<=1s) to emulate resource overload
without imposing arbitrary resource overloads (I enabled DEBUG log, which could
also add a bit more load). Apart from repro test, the real applications that
run into this issue are heavily loaded (high slot oversubscription) doing
continuous fail-overs (by code error).
h2. Experiment
I've tested the simple solution like having *TaskSlotTable.allocateSlot* check
if *allocationIDTaskSlotMap* has an existing task slot entry for a requested
AID, it can reject allocation up front, so that the redundant slot request can
fail fast not reaching reservation and slot offer, preventing
allocationIDTaskSlotMap overwrite and inconsistent allocation tables. In the
above example, the fix can replace step 8, 9, and 10 with RM getting
*SlotAllocationException* on handling failed slot request. It may retry the
pending request via handleFailedSlotRequest, but it would stop retrying once
next heartbeat saying the pending slot was allocated by another AID or
eventually slot request timeout. My test with the fix has run for a while
hitting this race case multiple times and survived without slot allocation
failure.
As my experience may be in a bit old version (1.6.2), It would be good to
discuss what other unexpected things happened and what would be other related
issues that have been incorporated in mainline.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)