[ https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16739998#comment-16739998 ]
zhijiang commented on FLINK-11082: ---------------------------------- [~pnowojski], thanks for replies! It might not make sense to increase backlog when adding new buffer based on whether current queue is empty or not. It has two concerns: 1.If there exists only event buffers in queue, for first adding data buffer the backlog should not be increased. So it might need another boolean flag to indicate whether the added data buffer is the first one or not. 2.Take another case not considering flusher and event factors. * Add the first buffer to queue by writer, and the current backlog should be 0. * Add the second buffer to increase backlog 1, and trigger notify data available for transporting. * The first buffer is popped from queue to decrease backlog to 0 by netty thread. Note the second buffers might already be finished by writer, but the third buffer has not been added into queue yet. Then the sub partition is still available for transporting and backlog is 0 currently. * The second buffer is popped from queue to decrease backlog to -1 by netty thread. After that the third buffer is added into queue to increase the backlog to 0. There leaves only one unfinished buffer in queue. So the value of backlog might be -1 sometimes. When one buffer is finished by writer, the backlog should be increased in theory. But the increase is triggered delay by adding the next buffer. Before increasing backlog, the netty thread can see the finished buffer and decrease backlog in advance. This causes the above problem. If considering flusher factor, the conditions seem more complicated. I also considered some other ways before. One way for solving above issue is introducing another {{notifyBufferBuilderFinished}} method in {{ResultPartitionWriter}} interface. In {{RecoredWriter}}, it should first call {{notifyBufferBuilderFinished}} before calling {{BufferBuilder#finish()}}, and the backlog is increased in {{notifyBufferBuilderFinished}} implementation. In other words, the backlog should alway be increased first, and then decreased. Further considering flusher factor in my above proposal: * If the flush triggered, whether to increase backlog based on the last buffer is finished or not. If there are 2 buffers in queue and the second buffer is already finished, the backlog is still 2 when flush triggered. If the second buffer is not finished, the backlog increases from 1 to 2 by flush triggered. * When {{notifyBufferBuilderFinished}}, if {{flushRequested}} is true, then the backlog should not increase because the flush already help do that before, otherwise increase the backlog. E.g. when the second buffer is finished and flush already triggered before, the backlog should still be 2. * When adding new buffer into queue, if {{flushRequested}} is true, then the backlog should increase to reflect flush work. E.g. when the third buffer is added and {{flushRequested}} is true, the current backlog should increase from 2 to 3. * When peeking the last buffer in queue, no matter the buffer is finished or not, the backlog would be 0 and {{flushRequested}} becomes false. It makes sense for {{PipelinedSubpartition}}. But it adds another separate synchronization for increasing backlog, and the current way is already in synchronization during adding buffer. For {{SpillableSubpartition}} I would think it a bit through. > Increase backlog only if it is available for consumption > -------------------------------------------------------- > > Key: FLINK-11082 > URL: https://issues.apache.org/jira/browse/FLINK-11082 > Project: Flink > Issue Type: Sub-task > Components: Network > Affects Versions: 1.5.6, 1.6.3, 1.7.1, 1.8.0 > Reporter: zhijiang > Assignee: zhijiang > Priority: Major > > The backlog should indicate how many buffers are available in subpartition > for downstream's consumption. The availability is considered from two > factors. One is {{BufferConsumer}} finished, and the other is flush triggered. > In current implementation, when the {{BufferConsumer}} is added into the > subpartition, then the backlog is increased as a result, but this > {{BufferConsumer}} is not yet available for network transport. > Furthermore, the backlog would affect requesting floating buffers on > downstream side. That means some floating buffers are fetched in advance but > not be used for long time, so the floating buffers are not made use of > efficiently. > We found this scenario extremely for rebalance selector on upstream side, so > we want to change when to increase backlog by finishing {{BufferConsumer}} or > flush triggered. -- This message was sent by Atlassian JIRA (v7.6.3#76005)