[ 
https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-11082:
-----------------------------
    Description: 
The backlog of subpartition should indicate how many buffers are consumable, 
then the consumer could feedback the corresponding credits for transporting 
these buffers. But in current PipelinedSubpartitionimplementation, the backlog 
is increased by 1 when a BufferConsumer is added into PipelinedSubpartition, 
and decreased by 1 when a BufferConsumer is removed from PipelinedSubpartition. 
So the backlog only reflects how many buffers are retained in 
PipelinedSubpartition, which is not always equivalent to the number of 
consumable buffers.

The backlog inconsistency might result in floating buffers misdistribution on 
consumer side, because the consumer would request floating buffers based on 
backlog value, then one floating buffer might not be used in RemoteInputChannel 
long time after requesting.

Considering the solution, the last buffer in PipelinedSubpartition could only 
be consumable in the case of flush triggered or partition finished. So we could 
calculate the backlog precisely based on partition flushed/finished conditions.

  was:
The backlog should indicate how many buffers are available in subpartition for 
downstream's  consumption. The availability is considered from two factors. One 
is {{BufferConsumer}} finished, and the other is flush triggered.

In current implementation, when the {{BufferConsumer}} is added into the 
subpartition, then the backlog is increased as a result, but this 
{{BufferConsumer}} is not yet available for network transport.

Furthermore, the backlog would affect requesting floating buffers on downstream 
side. That means some floating buffers are fetched in advance but not be used 
for long time, so the floating buffers are not made use of efficiently.

We found this scenario extremely for rebalance selector on upstream side, so we 
want to change when to increase backlog by finishing {{BufferConsumer}} or 
flush triggered.


> Fix the calculation of backlog in PipelinedSubpartition
> -------------------------------------------------------
>
>                 Key: FLINK-11082
>                 URL: https://issues.apache.org/jira/browse/FLINK-11082
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Network
>    Affects Versions: 1.5.6, 1.6.3, 1.7.1, 1.8.0
>            Reporter: zhijiang
>            Assignee: zhijiang
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> The backlog of subpartition should indicate how many buffers are consumable, 
> then the consumer could feedback the corresponding credits for transporting 
> these buffers. But in current PipelinedSubpartitionimplementation, the 
> backlog is increased by 1 when a BufferConsumer is added into 
> PipelinedSubpartition, and decreased by 1 when a BufferConsumer is removed 
> from PipelinedSubpartition. So the backlog only reflects how many buffers are 
> retained in PipelinedSubpartition, which is not always equivalent to the 
> number of consumable buffers.
> The backlog inconsistency might result in floating buffers misdistribution on 
> consumer side, because the consumer would request floating buffers based on 
> backlog value, then one floating buffer might not be used in 
> RemoteInputChannel long time after requesting.
> Considering the solution, the last buffer in PipelinedSubpartition could only 
> be consumable in the case of flush triggered or partition finished. So we 
> could calculate the backlog precisely based on partition flushed/finished 
> conditions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to