Xintong Song created FLINK-19151:
------------------------------------

             Summary: Flink does not normalize container resource with correct 
configurations when Yarn FairScheduler is used 
                 Key: FLINK-19151
                 URL: https://issues.apache.org/jira/browse/FLINK-19151
             Project: Flink
          Issue Type: Bug
          Components: Deployment / YARN
    Affects Versions: 1.11.2
            Reporter: Xintong Song


h3. Problem

It's a Yarn protocol that the requested container resource will be normalized 
for allocation. That means, the allocated container may have different resource 
(larger than or equal to) compared to requested.

Currently, Flink matches the allocated containers to the original requests by 
reading the Yarn configurations and calculate how the requested resources 
should be normalized.

What has been overlooked is that, Yarn FairScheduler (and its subclass 
SLSFairScheduler) has overridden the normalization behavior. To be specific,
 * By default, Yarn normalize container resources to integer multiple of 
"yarn.scheduler.minimum-allocation-[mb|vcores]"
 * FairScheduler normalize container resources to integer multiple of 
"yarn.resource-types.[memory-mb|vcores].increment-allocation" (or the 
deprecated keys "yarn.scheduler.increment-allocation-[mb|vcores]"), while 
making sure the resource is no less than 
"yarn.scheduler.minimum-allocation-[mb|vcores]"

h3. Proposal for short term solution

To fix this problem, a quick and easy way is to also read Yarn configuration 
and learn which scheduler is used, and perform normalization calculations 
accordingly. This should be good enough to cover behaviors of all the 
schedulers that Yarn currently provides. The limitation is that, Flink will not 
be able to deal with custom Yarn schedulers which overrides the normalization 
behaviors.
h3. Proposal for long term solution

For long term, it would be good to use Yarn 
ContainerRequest#allocationRequestId to match the allocated containers with the 
original requests, so that Flink no longer needs to understand how Yarn 
normalize container resources. 

Yarn ContainerRequest#allocationRequestId is introduced in Hadoop 2.9, while 
ATM Flink claims to be compatible with Hadoop 2.4+. Therefore, this solution 
would not work at the moment.

Another idea is to support various Hadoop versions with different container 
matching logics. We can abstract the container matching logics into a 
dedicating component, and provide different implementations for it. This will 
allow Flink to take advantages of the new versions (e.g., work well with custom 
schedulers), while stay compatible with the old versions with without those 
advantages.

Given that we need the resource based matching anyway for the old Hadoop 
versions, and the cost for maintaining two sets of matching logics, I tend to 
think this approach as a back-up option to be worked on when we indeed see a 
need for it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to