[ 
https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16739998#comment-16739998
 ] 

zhijiang commented on FLINK-11082:
----------------------------------

[~pnowojski], thanks for replies!

It might not make sense to increase backlog when adding new buffer based on 
whether current queue is empty or not. It has two concerns:

1.If there exists only event buffers in queue, for first adding data buffer the 
backlog should not be increased. So it might need another boolean flag to 
indicate whether the added data buffer is the first one or not. 

2.Take another case not considering flusher and event factors.
 * Add the first buffer to queue by writer, and the current backlog should be 0.
 * Add the second buffer to increase backlog 1, and trigger notify data 
available for transporting.
 * The first buffer is popped from queue to decrease backlog to 0 by netty 
thread. Note the second buffers might already be finished by writer, but the 
third buffer has not been added into queue yet. Then the sub partition is still 
available for transporting and backlog is 0 currently.
 * The second buffer is popped from queue to decrease backlog to -1 by netty 
thread. After that the third buffer is added into queue to increase the backlog 
to 0. There leaves only one unfinished buffer in queue. So the value of backlog 
might be -1 sometimes.

When one buffer is finished by writer, the backlog should be increased in 
theory. But the increase is triggered delay by adding the next buffer. Before 
increasing backlog, the netty thread can see the finished buffer and decrease 
backlog in advance. This causes the above problem.

If considering flusher factor, the conditions seem more complicated. I also 
considered some other ways before. One way for solving above issue is 
introducing another {{notifyBufferBuilderFinished}} method in 
{{ResultPartitionWriter}} interface. In {{RecoredWriter}}, it should first call 
{{notifyBufferBuilderFinished}} before calling {{BufferBuilder#finish()}}, and 
the backlog is increased in  {{notifyBufferBuilderFinished}} implementation. In 
other words, the backlog should alway be increased first, and then decreased. 

Further considering flusher factor in my above proposal:
 * If the flush triggered, whether to increase backlog based on the last buffer 
is finished or not. If there are 2 buffers in queue and the second buffer is 
already finished, the backlog is still 2 when flush triggered. If the second 
buffer is not finished, the backlog increases from 1 to 2 by flush triggered.
 * When {{notifyBufferBuilderFinished}}, if {{flushRequested}} is true, then 
the backlog should not increase because the flush already help do that before, 
otherwise increase the backlog. E.g. when the second buffer is finished and 
flush already triggered before, the backlog should still be 2.
 * When adding new buffer into queue, if {{flushRequested}} is true, then the 
backlog should increase to reflect flush work. E.g. when the third buffer is 
added and {{flushRequested}} is true, the current backlog should increase from 
2 to 3.
 * When peeking the last buffer in queue, no matter the buffer is finished or 
not, the backlog would be 0 and {{flushRequested}} becomes false.

It makes sense for {{PipelinedSubpartition}}. But it adds another separate 
synchronization for increasing backlog, and the current way is already in 
synchronization during adding buffer. For {{SpillableSubpartition}} I would 
think it a bit through.

> Increase backlog only if it is available for consumption
> --------------------------------------------------------
>
>                 Key: FLINK-11082
>                 URL: https://issues.apache.org/jira/browse/FLINK-11082
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Network
>    Affects Versions: 1.5.6, 1.6.3, 1.7.1, 1.8.0
>            Reporter: zhijiang
>            Assignee: zhijiang
>            Priority: Major
>
> The backlog should indicate how many buffers are available in subpartition 
> for downstream's  consumption. The availability is considered from two 
> factors. One is {{BufferConsumer}} finished, and the other is flush triggered.
> In current implementation, when the {{BufferConsumer}} is added into the 
> subpartition, then the backlog is increased as a result, but this 
> {{BufferConsumer}} is not yet available for network transport.
> Furthermore, the backlog would affect requesting floating buffers on 
> downstream side. That means some floating buffers are fetched in advance but 
> not be used for long time, so the floating buffers are not made use of 
> efficiently.
> We found this scenario extremely for rebalance selector on upstream side, so 
> we want to change when to increase backlog by finishing {{BufferConsumer}} or 
> flush triggered.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to