[ https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
zhijiang updated FLINK-11082: ----------------------------- Description: The backlog of subpartition should indicate how many buffers are consumable, then the consumer could feedback the corresponding credits for transporting these buffers. But in current PipelinedSubpartitionimplementation, the backlog is increased by 1 when a BufferConsumer is added into PipelinedSubpartition, and decreased by 1 when a BufferConsumer is removed from PipelinedSubpartition. So the backlog only reflects how many buffers are retained in PipelinedSubpartition, which is not always equivalent to the number of consumable buffers. The backlog inconsistency might result in floating buffers misdistribution on consumer side, because the consumer would request floating buffers based on backlog value, then one floating buffer might not be used in RemoteInputChannel long time after requesting. Considering the solution, the last buffer in PipelinedSubpartition could only be consumable in the case of flush triggered or partition finished. So we could calculate the backlog precisely based on partition flushed/finished conditions. was: The backlog should indicate how many buffers are available in subpartition for downstream's consumption. The availability is considered from two factors. One is {{BufferConsumer}} finished, and the other is flush triggered. In current implementation, when the {{BufferConsumer}} is added into the subpartition, then the backlog is increased as a result, but this {{BufferConsumer}} is not yet available for network transport. Furthermore, the backlog would affect requesting floating buffers on downstream side. That means some floating buffers are fetched in advance but not be used for long time, so the floating buffers are not made use of efficiently. We found this scenario extremely for rebalance selector on upstream side, so we want to change when to increase backlog by finishing {{BufferConsumer}} or flush triggered. > Fix the calculation of backlog in PipelinedSubpartition > ------------------------------------------------------- > > Key: FLINK-11082 > URL: https://issues.apache.org/jira/browse/FLINK-11082 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network > Affects Versions: 1.5.6, 1.6.3, 1.7.1, 1.8.0 > Reporter: zhijiang > Assignee: zhijiang > Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The backlog of subpartition should indicate how many buffers are consumable, > then the consumer could feedback the corresponding credits for transporting > these buffers. But in current PipelinedSubpartitionimplementation, the > backlog is increased by 1 when a BufferConsumer is added into > PipelinedSubpartition, and decreased by 1 when a BufferConsumer is removed > from PipelinedSubpartition. So the backlog only reflects how many buffers are > retained in PipelinedSubpartition, which is not always equivalent to the > number of consumable buffers. > The backlog inconsistency might result in floating buffers misdistribution on > consumer side, because the consumer would request floating buffers based on > backlog value, then one floating buffer might not be used in > RemoteInputChannel long time after requesting. > Considering the solution, the last buffer in PipelinedSubpartition could only > be consumable in the case of flush triggered or partition finished. So we > could calculate the backlog precisely based on partition flushed/finished > conditions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)