[jira] [Commented] (FLINK-11082) Increase backlog only if it is available for consumption
[ https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16740056#comment-16740056 ] zhijiang commented on FLINK-11082: -- Consider another issue you mentioned, I think the higher CPU and packages are not caused by this bug. Although the backlog is increased once adding into queue, this partial buffer would not be transported by network. The available condition for transport is determined by both two factors: * Data available: Finished buffer or flush triggered. * Credit available: credit >0 or next is event. So data available condition might not be satisfied in {{PipelinedSubpartition#isAvailable()}} although the increased backlog makes credit avaialble. Another point is partial true as you said above. We always try to assign two additional credits to sender in best-effort way, then we could assume that the sender never needs wait for credit notification whenever data becomes available. But these two additional credits may come from exclusive or floating buffers, and the amount might be less than two. E.g. If the exclusive buffers are already used and inserted into queue in input channel, and the floating buffers are not enough in pool, then the sender might get one or zero addition credit when idle. Have you remembered the problem user reported is for {{PipelinedSubpartition}} or {{SpillableSubpartition}}? For pipelined case I have not thought of the issue to cause that currently. But for blocking case, I need double check the current implementation whether it might increase the total number of {{BufferResponse}} messages than before. > Increase backlog only if it is available for consumption > > > Key: FLINK-11082 > URL: https://issues.apache.org/jira/browse/FLINK-11082 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.6, 1.6.3, 1.7.1, 1.8.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > The backlog should indicate how many buffers are available in subpartition > for downstream's consumption. The availability is considered from two > factors. One is {{BufferConsumer}} finished, and the other is flush triggered. > In current implementation, when the {{BufferConsumer}} is added into the > subpartition, then the backlog is increased as a result, but this > {{BufferConsumer}} is not yet available for network transport. > Furthermore, the backlog would affect requesting floating buffers on > downstream side. That means some floating buffers are fetched in advance but > not be used for long time, so the floating buffers are not made use of > efficiently. > We found this scenario extremely for rebalance selector on upstream side, so > we want to change when to increase backlog by finishing {{BufferConsumer}} or > flush triggered. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11082) Increase backlog only if it is available for consumption
[ https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739998#comment-16739998 ] zhijiang commented on FLINK-11082: -- [~pnowojski], thanks for replies! It might not make sense to increase backlog when adding new buffer based on whether current queue is empty or not. It has two concerns: 1.If there exists only event buffers in queue, for first adding data buffer the backlog should not be increased. So it might need another boolean flag to indicate whether the added data buffer is the first one or not. 2.Take another case not considering flusher and event factors. * Add the first buffer to queue by writer, and the current backlog should be 0. * Add the second buffer to increase backlog 1, and trigger notify data available for transporting. * The first buffer is popped from queue to decrease backlog to 0 by netty thread. Note the second buffers might already be finished by writer, but the third buffer has not been added into queue yet. Then the sub partition is still available for transporting and backlog is 0 currently. * The second buffer is popped from queue to decrease backlog to -1 by netty thread. After that the third buffer is added into queue to increase the backlog to 0. There leaves only one unfinished buffer in queue. So the value of backlog might be -1 sometimes. When one buffer is finished by writer, the backlog should be increased in theory. But the increase is triggered delay by adding the next buffer. Before increasing backlog, the netty thread can see the finished buffer and decrease backlog in advance. This causes the above problem. If considering flusher factor, the conditions seem more complicated. I also considered some other ways before. One way for solving above issue is introducing another {{notifyBufferBuilderFinished}} method in {{ResultPartitionWriter}} interface. In {{RecoredWriter}}, it should first call {{notifyBufferBuilderFinished}} before calling {{BufferBuilder#finish()}}, and the backlog is increased in {{notifyBufferBuilderFinished}} implementation. In other words, the backlog should alway be increased first, and then decreased. Further considering flusher factor in my above proposal: * If the flush triggered, whether to increase backlog based on the last buffer is finished or not. If there are 2 buffers in queue and the second buffer is already finished, the backlog is still 2 when flush triggered. If the second buffer is not finished, the backlog increases from 1 to 2 by flush triggered. * When {{notifyBufferBuilderFinished}}, if {{flushRequested}} is true, then the backlog should not increase because the flush already help do that before, otherwise increase the backlog. E.g. when the second buffer is finished and flush already triggered before, the backlog should still be 2. * When adding new buffer into queue, if {{flushRequested}} is true, then the backlog should increase to reflect flush work. E.g. when the third buffer is added and {{flushRequested}} is true, the current backlog should increase from 2 to 3. * When peeking the last buffer in queue, no matter the buffer is finished or not, the backlog would be 0 and {{flushRequested}} becomes false. It makes sense for {{PipelinedSubpartition}}. But it adds another separate synchronization for increasing backlog, and the current way is already in synchronization during adding buffer. For {{SpillableSubpartition}} I would think it a bit through. > Increase backlog only if it is available for consumption > > > Key: FLINK-11082 > URL: https://issues.apache.org/jira/browse/FLINK-11082 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.6, 1.6.3, 1.7.1, 1.8.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > The backlog should indicate how many buffers are available in subpartition > for downstream's consumption. The availability is considered from two > factors. One is {{BufferConsumer}} finished, and the other is flush triggered. > In current implementation, when the {{BufferConsumer}} is added into the > subpartition, then the backlog is increased as a result, but this > {{BufferConsumer}} is not yet available for network transport. > Furthermore, the backlog would affect requesting floating buffers on > downstream side. That means some floating buffers are fetched in advance but > not be used for long time, so the floating buffers are not made use of > efficiently. > We found this scenario extremely for rebalance selector on upstream side, so > we want to change when to increase backlog by finishing {{BufferConsumer}} or > flush triggered. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11082) Increase backlog only if it is available for consumption
[ https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739385#comment-16739385 ] Piotr Nowojski commented on FLINK-11082: Another issue. Could this bug might explain why one user was recently reporting higher CPU usage and 300% increase in number of packets being sent between the nodes after upgrading from Flink 1.4? Previously we were aware that credit base flow control increases the network traffic/number of messages sent between nodes by 100%. But if we announce the fresh partial buffers immediately to the receiver, could it be that the small chunk of that data is being sent prematurely, before {{flushRequested}} or next {{BufferConsumer}} is enqueued? Sending chunk of data prematurely and assigning new credit would explain the remaining unaccounted "200%" number of messages being sent. Btw, [~zjwang] if channel is idle, two exclusive buffers will be assigned to the sender and he will have some buffers for immediate use whenever the channel becomes active? > Increase backlog only if it is available for consumption > > > Key: FLINK-11082 > URL: https://issues.apache.org/jira/browse/FLINK-11082 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.6, 1.6.3, 1.7.1, 1.8.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > The backlog should indicate how many buffers are available in subpartition > for downstream's consumption. The availability is considered from two > factors. One is {{BufferConsumer}} finished, and the other is flush triggered. > In current implementation, when the {{BufferConsumer}} is added into the > subpartition, then the backlog is increased as a result, but this > {{BufferConsumer}} is not yet available for network transport. > Furthermore, the backlog would affect requesting floating buffers on > downstream side. That means some floating buffers are fetched in advance but > not be used for long time, so the floating buffers are not made use of > efficiently. > We found this scenario extremely for rebalance selector on upstream side, so > we want to change when to increase backlog by finishing {{BufferConsumer}} or > flush triggered. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11082) Increase backlog only if it is available for consumption
[ https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739380#comment-16739380 ] Piotr Nowojski commented on FLINK-11082: [~zjwang] why should we report backlog of -1? Shouldn't your last point work like that: - Peek the last buffer from queue, the flushRequested is set false. Regardless f this last buffer is finished or not, the backlog should be decreased to 0. If the buffer is finished, pop it from the queue. If the buffer is not finished, keep it with backlog of 0. - Backlog increases to 1 if we flush or if we enqueue second buffer. > Increase backlog only if it is available for consumption > > > Key: FLINK-11082 > URL: https://issues.apache.org/jira/browse/FLINK-11082 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.6, 1.6.3, 1.7.1, 1.8.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > The backlog should indicate how many buffers are available in subpartition > for downstream's consumption. The availability is considered from two > factors. One is {{BufferConsumer}} finished, and the other is flush triggered. > In current implementation, when the {{BufferConsumer}} is added into the > subpartition, then the backlog is increased as a result, but this > {{BufferConsumer}} is not yet available for network transport. > Furthermore, the backlog would affect requesting floating buffers on > downstream side. That means some floating buffers are fetched in advance but > not be used for long time, so the floating buffers are not made use of > efficiently. > We found this scenario extremely for rebalance selector on upstream side, so > we want to change when to increase backlog by finishing {{BufferConsumer}} or > flush triggered. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11082) Increase backlog only if it is available for consumption
[ https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739246#comment-16739246 ] zhijiang commented on FLINK-11082: -- Considering implementation, it seems a bit complicated for both finished {{BufferBuilder}} and flush factors. Increase backlog could be triggered by two conditions which are adding from second {{BufferConsumer}} and flush triggered. Decrease backlog could also be triggered by two conditions which are popping finished {{BufferConsumer}} from queue and {{flushRequested}} set as false for the last element in queue. So it might cause the backlog to be {{-1}} sometimes. For example: * Two {{BufferConsumer}}s in queue, the second buffer is unfinished, then the current backlog is 1. * Add one more {{BufferConsumer}} in queue, three buffers in queue, then backlog is 2. * Flush triggered, still three {{BufferConsumer}}s in queue, then backlog is 3. * Pop one {{BufferConsumer}} from queue, two buffers left in queue, backlog is 2. * Pop one {{BufferConsumer}} from queue again, one buffer left in queue, backlog is 1. * Peek the last buffer from queue, the {{flushRequested}} is set false. If this last buffer is not finished, the backlog should be decreased to 0 and it seems no problem. But if the last buffer is already finished, then it would be popped from queue and decreased again to be -1. When the new {{BufferConsumer}} is added into queue, the backlog will increase to 0. So the backlog might encounter decrease twice for the last buffer in queue. Although the backlog is actually correct, it might seem strange for the value of -1. What do you think? > Increase backlog only if it is available for consumption > > > Key: FLINK-11082 > URL: https://issues.apache.org/jira/browse/FLINK-11082 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.6, 1.6.3, 1.7.1, 1.8.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > The backlog should indicate how many buffers are available in subpartition > for downstream's consumption. The availability is considered from two > factors. One is {{BufferConsumer}} finished, and the other is flush triggered. > In current implementation, when the {{BufferConsumer}} is added into the > subpartition, then the backlog is increased as a result, but this > {{BufferConsumer}} is not yet available for network transport. > Furthermore, the backlog would affect requesting floating buffers on > downstream side. That means some floating buffers are fetched in advance but > not be used for long time, so the floating buffers are not made use of > efficiently. > We found this scenario extremely for rebalance selector on upstream side, so > we want to change when to increase backlog by finishing {{BufferConsumer}} or > flush triggered. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11082) Increase backlog only if it is available for consumption
[ https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16736890#comment-16736890 ] Piotr Nowojski commented on FLINK-11082: Just for the record +1 from my side that this should be fixed. > Increase backlog only if it is available for consumption > > > Key: FLINK-11082 > URL: https://issues.apache.org/jira/browse/FLINK-11082 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.6, 1.6.3, 1.7.1, 1.8.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > The backlog should indicate how many buffers are available in subpartition > for downstream's consumption. The availability is considered from two > factors. One is {{BufferConsumer}} finished, and the other is flush triggered. > In current implementation, when the {{BufferConsumer}} is added into the > subpartition, then the backlog is increased as a result, but this > {{BufferConsumer}} is not yet available for network transport. > Furthermore, the backlog would affect requesting floating buffers on > downstream side. That means some floating buffers are fetched in advance but > not be used for long time, so the floating buffers are not made use of > efficiently. > We found this scenario extremely for rebalance selector on upstream side, so > we want to change when to increase backlog by finishing {{BufferConsumer}} or > flush triggered. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11082) Increase backlog only if it is available for consumption
[ https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16736665#comment-16736665 ] zhijiang commented on FLINK-11082: -- [~NicoK] yes, these two features are in parallel for Flink-1.5 which might result in this problem. My initial thought for fixing this issue is just as you said. :) > Increase backlog only if it is available for consumption > > > Key: FLINK-11082 > URL: https://issues.apache.org/jira/browse/FLINK-11082 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.6, 1.6.3, 1.7.1, 1.8.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > The backlog should indicate how many buffers are available in subpartition > for downstream's consumption. The availability is considered from two > factors. One is {{BufferConsumer}} finished, and the other is flush triggered. > In current implementation, when the {{BufferConsumer}} is added into the > subpartition, then the backlog is increased as a result, but this > {{BufferConsumer}} is not yet available for network transport. > Furthermore, the backlog would affect requesting floating buffers on > downstream side. That means some floating buffers are fetched in advance but > not be used for long time, so the floating buffers are not made use of > efficiently. > We found this scenario extremely for rebalance selector on upstream side, so > we want to change when to increase backlog by finishing {{BufferConsumer}} or > flush triggered. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11082) Increase backlog only if it is available for consumption
[ https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16736663#comment-16736663 ] zhijiang commented on FLINK-11082: -- [~pnowojski], your understanding is right. It might be more obvious for batch job of disabling flush mechanism and keyby mode. > Increase backlog only if it is available for consumption > > > Key: FLINK-11082 > URL: https://issues.apache.org/jira/browse/FLINK-11082 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.6, 1.6.3, 1.7.1, 1.8.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > The backlog should indicate how many buffers are available in subpartition > for downstream's consumption. The availability is considered from two > factors. One is {{BufferConsumer}} finished, and the other is flush triggered. > In current implementation, when the {{BufferConsumer}} is added into the > subpartition, then the backlog is increased as a result, but this > {{BufferConsumer}} is not yet available for network transport. > Furthermore, the backlog would affect requesting floating buffers on > downstream side. That means some floating buffers are fetched in advance but > not be used for long time, so the floating buffers are not made use of > efficiently. > We found this scenario extremely for rebalance selector on upstream side, so > we want to change when to increase backlog by finishing {{BufferConsumer}} or > flush triggered. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11082) Increase backlog only if it is available for consumption
[ https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16735835#comment-16735835 ] Nico Kruber commented on FLINK-11082: - Nice find [~zjwang]. I guess the current behaviour comes from the two changes we were doing in parallel and that without the low-latency changes, whenever a buffer was added, it was ready-to-consume which is not that anymore. {{PipelinedSubpartition#add()}} always increases the backlog for any {{BufferConsumer}} which is initially empty and not ready for consumption - as noted. Instead, we can increase the backlog when adding a second {{BufferConsumer}} or when flushing, carefully considering not to increase too often. > Increase backlog only if it is available for consumption > > > Key: FLINK-11082 > URL: https://issues.apache.org/jira/browse/FLINK-11082 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.8.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > The backlog should indicate how many buffers are available in subpartition > for downstream's consumption. The availability is considered from two > factors. One is {{BufferConsumer}} finished, and the other is flush triggered. > In current implementation, when the {{BufferConsumer}} is added into the > subpartition, then the backlog is increased as a result, but this > {{BufferConsumer}} is not yet available for network transport. > Furthermore, the backlog would affect requesting floating buffers on > downstream side. That means some floating buffers are fetched in advance but > not be used for long time, so the floating buffers are not made use of > efficiently. > We found this scenario extremely for rebalance selector on upstream side, so > we want to change when to increase backlog by finishing {{BufferConsumer}} or > flush triggered. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11082) Increase backlog only if it is available for consumption
[ https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16735815#comment-16735815 ] Piotr Nowojski commented on FLINK-11082: Hmmm, I think I’m starting to understand the issue. Currently, credit based model tries always to maintain 2 spare buffers per remote channel. If there are 3 buffers in the backlog that are already used, it will try to acquire and assign 3 floating buffers on top of the 2 exclusive buffers for that channel. That, combined with empty `BufferConsumers` bumping the backlog to 1, means that floating buffers are useless - they are always assigned to somewhere (completely randomly) even on very low throughputs. Or even with no throughput at all. If output flushing is disabled and we suddenly freeze production of records for couple of minutes, no data will be send, input queues will be empty, yet because of those “empty” enqueued `BufferConsumers` bumping the backlog to 1, all floating buffers will be assigned & frozen/wasted somewhere. While original intention was to assign floating buffers to “heavily” used channels and this doesn’t happen right now? > Increase backlog only if it is available for consumption > > > Key: FLINK-11082 > URL: https://issues.apache.org/jira/browse/FLINK-11082 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.8.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > The backlog should indicate how many buffers are available in subpartition > for downstream's consumption. The availability is considered from two > factors. One is {{BufferConsumer}} finished, and the other is flush triggered. > In current implementation, when the {{BufferConsumer}} is added into the > subpartition, then the backlog is increased as a result, but this > {{BufferConsumer}} is not yet available for network transport. > Furthermore, the backlog would affect requesting floating buffers on > downstream side. That means some floating buffers are fetched in advance but > not be used for long time, so the floating buffers are not made use of > efficiently. > We found this scenario extremely for rebalance selector on upstream side, so > we want to change when to increase backlog by finishing {{BufferConsumer}} or > flush triggered. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11082) Increase backlog only if it is available for consumption
[ https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16735597#comment-16735597 ] zhijiang commented on FLINK-11082: -- Thanks for replying, [~pnowojski] :) If I understood correctly, you described the scenario like this: # If the backlog is 2 for one input channel, and there are 2 exclusive available buffers currently, then this input channel would request 2 floating buffers to maintain total 4 available credits. # If one exclusive buffer is used for receiving network data, then the current backlog becomes 1, and there are 3 available credits currently. # If the above exclusive buffer is consumed by task and then recycle it, there are total 4 available credits now, then it would trigger return one extra floating buffer directly. No need to wait for the front another exclusive buffer. That means we always try to maintain {{backlog+initialCredit}} available credits, and if some exclusive buffers are recycled, the corresponding number of floating buffers would be returned immediately as a result. The related process is in {{RemoteInputChannel#AvailableBufferQueue#addExclusiveBuffer}}. So the floating buffers distribution is determined by backlog size currently, but the backlog size does not accurately reflect the condition that how many buffers are ready for transporting in network stack. > Increase backlog only if it is available for consumption > > > Key: FLINK-11082 > URL: https://issues.apache.org/jira/browse/FLINK-11082 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.8.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > The backlog should indicate how many buffers are available in subpartition > for downstream's consumption. The availability is considered from two > factors. One is {{BufferConsumer}} finished, and the other is flush triggered. > In current implementation, when the {{BufferConsumer}} is added into the > subpartition, then the backlog is increased as a result, but this > {{BufferConsumer}} is not yet available for network transport. > Furthermore, the backlog would affect requesting floating buffers on > downstream side. That means some floating buffers are fetched in advance but > not be used for long time, so the floating buffers are not made use of > efficiently. > We found this scenario extremely for rebalance selector on upstream side, so > we want to change when to increase backlog by finishing {{BufferConsumer}} or > flush triggered. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11082) Increase backlog only if it is available for consumption
[ https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16732053#comment-16732053 ] Piotr Nowojski commented on FLINK-11082: > Furthermore, the backlog would affect requesting floating buffers on > downstream side. That means some floating buffers are fetched in advance but > not be used for long time, so the floating buffers are not made use of > efficiently. I think that wouldn't be an issue, if floating vs exclusive were abstract concept. Like if we assign a floating credit to a channel (for example bumping number of assigned buffers from 2 up to 3), once we release ANY buffer from this channel and the number goes back down to "2", it could be interpreted/be an equivalent of saying that the channel released floating buffer and currently holds only to two exclusive buffers. As far as I remember, currently that's not the case, right? If we have assigned 2 exclusive buffers, then we assign one floating buffer, floating buffer is released only we process all three of those buffers, right? Maybe that's the root problem that we should fix? I think I vaguely remember this same behaviour causing some other problems. > Increase backlog only if it is available for consumption > > > Key: FLINK-11082 > URL: https://issues.apache.org/jira/browse/FLINK-11082 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.8.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > The backlog should indicate how many buffers are available in subpartition > for downstream's consumption. The availability is considered from two > factors. One is {{BufferConsumer}} finished, and the other is flush triggered. > In current implementation, when the {{BufferConsumer}} is added into the > subpartition, then the backlog is increased as a result, but this > {{BufferConsumer}} is not yet available for network transport. > Furthermore, the backlog would affect requesting floating buffers on > downstream side. That means some floating buffers are fetched in advance but > not be used for long time, so the floating buffers are not made use of > efficiently. > We found this scenario extremely for rebalance selector on upstream side, so > we want to change when to increase backlog by finishing {{BufferConsumer}} or > flush triggered. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11082) Increase backlog only if it is available for consumption
[ https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16710966#comment-16710966 ] zhijiang commented on FLINK-11082: -- [~pnowojski], what do you think of issue? > Increase backlog only if it is available for consumption > > > Key: FLINK-11082 > URL: https://issues.apache.org/jira/browse/FLINK-11082 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.8.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > The backlog should indicate how many buffers are available in subpartition > for downstream's consumption. The availability is considered from two > factors. One is {{BufferConsumer}} finished, and the other is flush triggered. > In current implementation, when the {{BufferConsumer}} is added into the > subpartition, then the backlog is increased as a result, but this > {{BufferConsumer}} is not yet available for network transport. > Furthermore, the backlog would affect requesting floating buffers on > downstream side. That means some floating buffers are fetched in advance but > not be used for long time, so the floating buffers are not made use of > efficiently. > We found this scenario extremely for rebalance selector on upstream side, so > we want to change when to increase backlog by finishing {{BufferConsumer}} or > flush triggered. -- This message was sent by Atlassian JIRA (v7.6.3#76005)