[jira] [Commented] (FLINK-11082) Increase backlog only if it is available for consumption

2019-01-10 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16740056#comment-16740056
 ] 

zhijiang commented on FLINK-11082:
--

Consider another issue you mentioned, I think the higher CPU and packages are 
not caused by this bug.

Although the backlog is increased once adding into queue, this partial buffer 
would not be transported by network. The available condition for transport is 
determined by both two factors:
 * Data available: Finished buffer or flush triggered.
 * Credit available: credit >0 or next is event.

So data available condition might not be satisfied in 
{{PipelinedSubpartition#isAvailable()}} although the increased backlog makes 
credit avaialble.

Another point is partial true as you said above. We always try to assign two 
additional credits to sender in best-effort way, then we could assume that the 
sender never needs wait for credit notification whenever data becomes 
available. But these two additional credits may come from exclusive or floating 
buffers, and the amount might be less than two. E.g. If the exclusive buffers 
are already used and inserted into queue in input channel, and the floating 
buffers are not enough in pool, then the sender might get one or zero addition 
credit when idle.

Have you remembered the problem user reported is for {{PipelinedSubpartition}} 
or {{SpillableSubpartition}}? For pipelined case I have not thought of the 
issue to cause that currently. But for blocking case, I need double check the 
current implementation whether it might increase the total number of 
{{BufferResponse}} messages than before.

> Increase backlog only if it is available for consumption
> 
>
> Key: FLINK-11082
> URL: https://issues.apache.org/jira/browse/FLINK-11082
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.6, 1.6.3, 1.7.1, 1.8.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> The backlog should indicate how many buffers are available in subpartition 
> for downstream's  consumption. The availability is considered from two 
> factors. One is {{BufferConsumer}} finished, and the other is flush triggered.
> In current implementation, when the {{BufferConsumer}} is added into the 
> subpartition, then the backlog is increased as a result, but this 
> {{BufferConsumer}} is not yet available for network transport.
> Furthermore, the backlog would affect requesting floating buffers on 
> downstream side. That means some floating buffers are fetched in advance but 
> not be used for long time, so the floating buffers are not made use of 
> efficiently.
> We found this scenario extremely for rebalance selector on upstream side, so 
> we want to change when to increase backlog by finishing {{BufferConsumer}} or 
> flush triggered.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11082) Increase backlog only if it is available for consumption

2019-01-10 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739998#comment-16739998
 ] 

zhijiang commented on FLINK-11082:
--

[~pnowojski], thanks for replies!

It might not make sense to increase backlog when adding new buffer based on 
whether current queue is empty or not. It has two concerns:

1.If there exists only event buffers in queue, for first adding data buffer the 
backlog should not be increased. So it might need another boolean flag to 
indicate whether the added data buffer is the first one or not. 

2.Take another case not considering flusher and event factors.
 * Add the first buffer to queue by writer, and the current backlog should be 0.
 * Add the second buffer to increase backlog 1, and trigger notify data 
available for transporting.
 * The first buffer is popped from queue to decrease backlog to 0 by netty 
thread. Note the second buffers might already be finished by writer, but the 
third buffer has not been added into queue yet. Then the sub partition is still 
available for transporting and backlog is 0 currently.
 * The second buffer is popped from queue to decrease backlog to -1 by netty 
thread. After that the third buffer is added into queue to increase the backlog 
to 0. There leaves only one unfinished buffer in queue. So the value of backlog 
might be -1 sometimes.

When one buffer is finished by writer, the backlog should be increased in 
theory. But the increase is triggered delay by adding the next buffer. Before 
increasing backlog, the netty thread can see the finished buffer and decrease 
backlog in advance. This causes the above problem.

If considering flusher factor, the conditions seem more complicated. I also 
considered some other ways before. One way for solving above issue is 
introducing another {{notifyBufferBuilderFinished}} method in 
{{ResultPartitionWriter}} interface. In {{RecoredWriter}}, it should first call 
{{notifyBufferBuilderFinished}} before calling {{BufferBuilder#finish()}}, and 
the backlog is increased in  {{notifyBufferBuilderFinished}} implementation. In 
other words, the backlog should alway be increased first, and then decreased. 

Further considering flusher factor in my above proposal:
 * If the flush triggered, whether to increase backlog based on the last buffer 
is finished or not. If there are 2 buffers in queue and the second buffer is 
already finished, the backlog is still 2 when flush triggered. If the second 
buffer is not finished, the backlog increases from 1 to 2 by flush triggered.
 * When {{notifyBufferBuilderFinished}}, if {{flushRequested}} is true, then 
the backlog should not increase because the flush already help do that before, 
otherwise increase the backlog. E.g. when the second buffer is finished and 
flush already triggered before, the backlog should still be 2.
 * When adding new buffer into queue, if {{flushRequested}} is true, then the 
backlog should increase to reflect flush work. E.g. when the third buffer is 
added and {{flushRequested}} is true, the current backlog should increase from 
2 to 3.
 * When peeking the last buffer in queue, no matter the buffer is finished or 
not, the backlog would be 0 and {{flushRequested}} becomes false.

It makes sense for {{PipelinedSubpartition}}. But it adds another separate 
synchronization for increasing backlog, and the current way is already in 
synchronization during adding buffer. For {{SpillableSubpartition}} I would 
think it a bit through.

> Increase backlog only if it is available for consumption
> 
>
> Key: FLINK-11082
> URL: https://issues.apache.org/jira/browse/FLINK-11082
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.6, 1.6.3, 1.7.1, 1.8.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> The backlog should indicate how many buffers are available in subpartition 
> for downstream's  consumption. The availability is considered from two 
> factors. One is {{BufferConsumer}} finished, and the other is flush triggered.
> In current implementation, when the {{BufferConsumer}} is added into the 
> subpartition, then the backlog is increased as a result, but this 
> {{BufferConsumer}} is not yet available for network transport.
> Furthermore, the backlog would affect requesting floating buffers on 
> downstream side. That means some floating buffers are fetched in advance but 
> not be used for long time, so the floating buffers are not made use of 
> efficiently.
> We found this scenario extremely for rebalance selector on upstream side, so 
> we want to change when to increase backlog by finishing {{BufferConsumer}} or 
> flush triggered.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11082) Increase backlog only if it is available for consumption

2019-01-10 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739385#comment-16739385
 ] 

Piotr Nowojski commented on FLINK-11082:


Another issue.

Could this bug might explain why one user was recently reporting higher CPU 
usage and 300% increase in number of packets being sent between the nodes after 
upgrading from Flink 1.4? Previously we were aware that credit base flow 
control increases the network traffic/number of messages sent between nodes by 
100%. But if we announce the fresh partial buffers immediately to the receiver, 
could it be that the small chunk of that data is being sent prematurely, before 
{{flushRequested}} or next {{BufferConsumer}} is enqueued? Sending chunk of 
data prematurely and assigning new credit would explain the remaining 
unaccounted "200%" number of messages being sent.

Btw, [~zjwang] if channel is idle, two exclusive buffers will be assigned to 
the sender and he will have some buffers for immediate use whenever the channel 
becomes active?

> Increase backlog only if it is available for consumption
> 
>
> Key: FLINK-11082
> URL: https://issues.apache.org/jira/browse/FLINK-11082
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.6, 1.6.3, 1.7.1, 1.8.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> The backlog should indicate how many buffers are available in subpartition 
> for downstream's  consumption. The availability is considered from two 
> factors. One is {{BufferConsumer}} finished, and the other is flush triggered.
> In current implementation, when the {{BufferConsumer}} is added into the 
> subpartition, then the backlog is increased as a result, but this 
> {{BufferConsumer}} is not yet available for network transport.
> Furthermore, the backlog would affect requesting floating buffers on 
> downstream side. That means some floating buffers are fetched in advance but 
> not be used for long time, so the floating buffers are not made use of 
> efficiently.
> We found this scenario extremely for rebalance selector on upstream side, so 
> we want to change when to increase backlog by finishing {{BufferConsumer}} or 
> flush triggered.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11082) Increase backlog only if it is available for consumption

2019-01-10 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739380#comment-16739380
 ] 

Piotr Nowojski commented on FLINK-11082:


[~zjwang] why should we report backlog of -1? Shouldn't your last point work 
like that:

- Peek the last buffer from queue, the flushRequested is set false. Regardless 
f this last buffer is finished or not, the backlog should be decreased to 0. If 
the buffer is finished, pop it from the queue. If the buffer is not finished, 
keep it with backlog of 0. 
- Backlog increases to 1 if we flush or if we enqueue second buffer.

> Increase backlog only if it is available for consumption
> 
>
> Key: FLINK-11082
> URL: https://issues.apache.org/jira/browse/FLINK-11082
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.6, 1.6.3, 1.7.1, 1.8.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> The backlog should indicate how many buffers are available in subpartition 
> for downstream's  consumption. The availability is considered from two 
> factors. One is {{BufferConsumer}} finished, and the other is flush triggered.
> In current implementation, when the {{BufferConsumer}} is added into the 
> subpartition, then the backlog is increased as a result, but this 
> {{BufferConsumer}} is not yet available for network transport.
> Furthermore, the backlog would affect requesting floating buffers on 
> downstream side. That means some floating buffers are fetched in advance but 
> not be used for long time, so the floating buffers are not made use of 
> efficiently.
> We found this scenario extremely for rebalance selector on upstream side, so 
> we want to change when to increase backlog by finishing {{BufferConsumer}} or 
> flush triggered.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11082) Increase backlog only if it is available for consumption

2019-01-10 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16739246#comment-16739246
 ] 

zhijiang commented on FLINK-11082:
--

Considering implementation, it seems a bit complicated for both finished 
{{BufferBuilder}} and flush factors.

Increase backlog could be triggered by two conditions which are adding from 
second {{BufferConsumer}} and flush triggered.

Decrease backlog could also be triggered by two conditions which are popping 
finished {{BufferConsumer}} from queue and {{flushRequested}} set as false for 
the last element in queue. So it might cause the backlog to be {{-1}} 
sometimes. For example:
 * Two {{BufferConsumer}}s in queue, the second buffer is unfinished, then the 
current backlog is 1.
 * Add one more {{BufferConsumer}} in queue, three buffers in queue, then 
backlog is 2.
 * Flush triggered, still three {{BufferConsumer}}s in queue, then backlog is 3.
 * Pop one {{BufferConsumer}} from queue, two buffers left in queue, backlog is 
2.
 * Pop one {{BufferConsumer}} from queue again, one buffer left in queue, 
backlog is 1.
 * Peek the last buffer from queue, the {{flushRequested}} is set false. If 
this last buffer is not finished, the backlog should be decreased to 0 and it 
seems no problem. But if the last buffer is already finished, then it would be 
popped from queue and decreased again to be -1. When the new {{BufferConsumer}} 
is added into queue, the backlog will increase to 0.

So the backlog might encounter decrease twice for the last buffer in queue. 
Although the backlog is actually correct, it might seem strange for the value 
of -1. What do you think?

> Increase backlog only if it is available for consumption
> 
>
> Key: FLINK-11082
> URL: https://issues.apache.org/jira/browse/FLINK-11082
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.6, 1.6.3, 1.7.1, 1.8.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> The backlog should indicate how many buffers are available in subpartition 
> for downstream's  consumption. The availability is considered from two 
> factors. One is {{BufferConsumer}} finished, and the other is flush triggered.
> In current implementation, when the {{BufferConsumer}} is added into the 
> subpartition, then the backlog is increased as a result, but this 
> {{BufferConsumer}} is not yet available for network transport.
> Furthermore, the backlog would affect requesting floating buffers on 
> downstream side. That means some floating buffers are fetched in advance but 
> not be used for long time, so the floating buffers are not made use of 
> efficiently.
> We found this scenario extremely for rebalance selector on upstream side, so 
> we want to change when to increase backlog by finishing {{BufferConsumer}} or 
> flush triggered.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11082) Increase backlog only if it is available for consumption

2019-01-08 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16736890#comment-16736890
 ] 

Piotr Nowojski commented on FLINK-11082:


Just for the record +1 from my side that this should be fixed.

> Increase backlog only if it is available for consumption
> 
>
> Key: FLINK-11082
> URL: https://issues.apache.org/jira/browse/FLINK-11082
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.6, 1.6.3, 1.7.1, 1.8.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> The backlog should indicate how many buffers are available in subpartition 
> for downstream's  consumption. The availability is considered from two 
> factors. One is {{BufferConsumer}} finished, and the other is flush triggered.
> In current implementation, when the {{BufferConsumer}} is added into the 
> subpartition, then the backlog is increased as a result, but this 
> {{BufferConsumer}} is not yet available for network transport.
> Furthermore, the backlog would affect requesting floating buffers on 
> downstream side. That means some floating buffers are fetched in advance but 
> not be used for long time, so the floating buffers are not made use of 
> efficiently.
> We found this scenario extremely for rebalance selector on upstream side, so 
> we want to change when to increase backlog by finishing {{BufferConsumer}} or 
> flush triggered.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11082) Increase backlog only if it is available for consumption

2019-01-07 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16736665#comment-16736665
 ] 

zhijiang commented on FLINK-11082:
--

[~NicoK] yes, these two features are in parallel for Flink-1.5 which might 
result in this problem. My initial thought for fixing this issue is just as you 
said. :)

> Increase backlog only if it is available for consumption
> 
>
> Key: FLINK-11082
> URL: https://issues.apache.org/jira/browse/FLINK-11082
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.6, 1.6.3, 1.7.1, 1.8.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> The backlog should indicate how many buffers are available in subpartition 
> for downstream's  consumption. The availability is considered from two 
> factors. One is {{BufferConsumer}} finished, and the other is flush triggered.
> In current implementation, when the {{BufferConsumer}} is added into the 
> subpartition, then the backlog is increased as a result, but this 
> {{BufferConsumer}} is not yet available for network transport.
> Furthermore, the backlog would affect requesting floating buffers on 
> downstream side. That means some floating buffers are fetched in advance but 
> not be used for long time, so the floating buffers are not made use of 
> efficiently.
> We found this scenario extremely for rebalance selector on upstream side, so 
> we want to change when to increase backlog by finishing {{BufferConsumer}} or 
> flush triggered.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11082) Increase backlog only if it is available for consumption

2019-01-07 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16736663#comment-16736663
 ] 

zhijiang commented on FLINK-11082:
--

[~pnowojski], your understanding is right. It might be more obvious for batch 
job of disabling flush mechanism and keyby mode.

 

> Increase backlog only if it is available for consumption
> 
>
> Key: FLINK-11082
> URL: https://issues.apache.org/jira/browse/FLINK-11082
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.6, 1.6.3, 1.7.1, 1.8.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> The backlog should indicate how many buffers are available in subpartition 
> for downstream's  consumption. The availability is considered from two 
> factors. One is {{BufferConsumer}} finished, and the other is flush triggered.
> In current implementation, when the {{BufferConsumer}} is added into the 
> subpartition, then the backlog is increased as a result, but this 
> {{BufferConsumer}} is not yet available for network transport.
> Furthermore, the backlog would affect requesting floating buffers on 
> downstream side. That means some floating buffers are fetched in advance but 
> not be used for long time, so the floating buffers are not made use of 
> efficiently.
> We found this scenario extremely for rebalance selector on upstream side, so 
> we want to change when to increase backlog by finishing {{BufferConsumer}} or 
> flush triggered.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11082) Increase backlog only if it is available for consumption

2019-01-07 Thread Nico Kruber (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16735835#comment-16735835
 ] 

Nico Kruber commented on FLINK-11082:
-

Nice find [~zjwang]. I guess the current behaviour comes from the two changes 
we were doing in parallel and that without the low-latency changes, whenever a 
buffer was added, it was ready-to-consume which is not that anymore. 
{{PipelinedSubpartition#add()}} always increases the backlog for any 
{{BufferConsumer}} which is initially empty and not ready for consumption - as 
noted.

Instead, we can increase the backlog when adding a second {{BufferConsumer}} or 
when flushing, carefully considering not to increase too often.

> Increase backlog only if it is available for consumption
> 
>
> Key: FLINK-11082
> URL: https://issues.apache.org/jira/browse/FLINK-11082
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.8.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> The backlog should indicate how many buffers are available in subpartition 
> for downstream's  consumption. The availability is considered from two 
> factors. One is {{BufferConsumer}} finished, and the other is flush triggered.
> In current implementation, when the {{BufferConsumer}} is added into the 
> subpartition, then the backlog is increased as a result, but this 
> {{BufferConsumer}} is not yet available for network transport.
> Furthermore, the backlog would affect requesting floating buffers on 
> downstream side. That means some floating buffers are fetched in advance but 
> not be used for long time, so the floating buffers are not made use of 
> efficiently.
> We found this scenario extremely for rebalance selector on upstream side, so 
> we want to change when to increase backlog by finishing {{BufferConsumer}} or 
> flush triggered.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11082) Increase backlog only if it is available for consumption

2019-01-07 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16735815#comment-16735815
 ] 

Piotr Nowojski commented on FLINK-11082:


Hmmm, I think I’m starting to understand the issue. Currently, credit based 
model tries always to maintain 2 spare buffers per remote channel. If there are 
3 buffers in the backlog that are already used, it will try to acquire and 
assign 3 floating buffers on top of the 2 exclusive buffers for that channel.

That, combined with empty `BufferConsumers` bumping the backlog to 1, means 
that floating buffers are useless - they are always assigned to somewhere 
(completely randomly) even on very low throughputs. Or even with no throughput 
at all. If output flushing is disabled and we suddenly freeze production of 
records for couple of minutes, no data will be send, input queues will be 
empty, yet because of those “empty” enqueued `BufferConsumers` bumping the 
backlog to 1, all floating buffers will be assigned & frozen/wasted somewhere.

While original intention was to assign floating buffers to “heavily” used 
channels and this doesn’t happen right now?

> Increase backlog only if it is available for consumption
> 
>
> Key: FLINK-11082
> URL: https://issues.apache.org/jira/browse/FLINK-11082
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.8.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> The backlog should indicate how many buffers are available in subpartition 
> for downstream's  consumption. The availability is considered from two 
> factors. One is {{BufferConsumer}} finished, and the other is flush triggered.
> In current implementation, when the {{BufferConsumer}} is added into the 
> subpartition, then the backlog is increased as a result, but this 
> {{BufferConsumer}} is not yet available for network transport.
> Furthermore, the backlog would affect requesting floating buffers on 
> downstream side. That means some floating buffers are fetched in advance but 
> not be used for long time, so the floating buffers are not made use of 
> efficiently.
> We found this scenario extremely for rebalance selector on upstream side, so 
> we want to change when to increase backlog by finishing {{BufferConsumer}} or 
> flush triggered.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11082) Increase backlog only if it is available for consumption

2019-01-07 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16735597#comment-16735597
 ] 

zhijiang commented on FLINK-11082:
--

Thanks for replying, [~pnowojski] :)

If I understood correctly, you described the scenario like this:
 # If the backlog is 2 for one input channel, and there are 2 exclusive 
available buffers currently, then this input channel would request 2 floating 
buffers to maintain total 4 available credits.
 # If one exclusive buffer is used for receiving network data, then the current 
backlog becomes 1, and there are 3 available credits currently.
 # If the above exclusive buffer is consumed by task and then recycle it, there 
are total 4 available credits now, then it would trigger return one extra 
floating buffer directly. No need to wait for the front another exclusive 
buffer. That means we always try to maintain {{backlog+initialCredit}} 
available credits, and if some exclusive buffers are recycled, the 
corresponding number of floating buffers would be returned immediately as a 
result. The related process is in 
{{RemoteInputChannel#AvailableBufferQueue#addExclusiveBuffer}}.

 

So the floating buffers distribution is determined by backlog size currently, 
but the backlog size does not accurately reflect the condition that how many 
buffers are ready for transporting in network stack.

> Increase backlog only if it is available for consumption
> 
>
> Key: FLINK-11082
> URL: https://issues.apache.org/jira/browse/FLINK-11082
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.8.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> The backlog should indicate how many buffers are available in subpartition 
> for downstream's  consumption. The availability is considered from two 
> factors. One is {{BufferConsumer}} finished, and the other is flush triggered.
> In current implementation, when the {{BufferConsumer}} is added into the 
> subpartition, then the backlog is increased as a result, but this 
> {{BufferConsumer}} is not yet available for network transport.
> Furthermore, the backlog would affect requesting floating buffers on 
> downstream side. That means some floating buffers are fetched in advance but 
> not be used for long time, so the floating buffers are not made use of 
> efficiently.
> We found this scenario extremely for rebalance selector on upstream side, so 
> we want to change when to increase backlog by finishing {{BufferConsumer}} or 
> flush triggered.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11082) Increase backlog only if it is available for consumption

2019-01-02 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16732053#comment-16732053
 ] 

Piotr Nowojski commented on FLINK-11082:


> Furthermore, the backlog would affect requesting floating buffers on 
> downstream side. That means some floating buffers are fetched in advance but 
> not be used for long time, so the floating buffers are not made use of 
> efficiently.

I think that wouldn't be an issue, if floating vs exclusive were abstract 
concept. Like if we assign a floating credit to a channel (for example bumping 
number of assigned buffers from 2 up to 3), once we release ANY buffer from 
this channel and the number goes back down to "2", it could be interpreted/be 
an equivalent of saying that the channel released floating buffer and currently 
holds only to two exclusive buffers. As far as I remember, currently that's not 
the case, right? If we have assigned 2 exclusive buffers, then we assign one 
floating buffer, floating buffer is released only we process all three of those 
buffers, right?

Maybe that's the root problem that we should fix? I think I vaguely remember 
this same behaviour causing some other problems.

> Increase backlog only if it is available for consumption
> 
>
> Key: FLINK-11082
> URL: https://issues.apache.org/jira/browse/FLINK-11082
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.8.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> The backlog should indicate how many buffers are available in subpartition 
> for downstream's  consumption. The availability is considered from two 
> factors. One is {{BufferConsumer}} finished, and the other is flush triggered.
> In current implementation, when the {{BufferConsumer}} is added into the 
> subpartition, then the backlog is increased as a result, but this 
> {{BufferConsumer}} is not yet available for network transport.
> Furthermore, the backlog would affect requesting floating buffers on 
> downstream side. That means some floating buffers are fetched in advance but 
> not be used for long time, so the floating buffers are not made use of 
> efficiently.
> We found this scenario extremely for rebalance selector on upstream side, so 
> we want to change when to increase backlog by finishing {{BufferConsumer}} or 
> flush triggered.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11082) Increase backlog only if it is available for consumption

2018-12-05 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16710966#comment-16710966
 ] 

zhijiang commented on FLINK-11082:
--

[~pnowojski], what do you think of issue?

> Increase backlog only if it is available for consumption
> 
>
> Key: FLINK-11082
> URL: https://issues.apache.org/jira/browse/FLINK-11082
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.8.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> The backlog should indicate how many buffers are available in subpartition 
> for downstream's  consumption. The availability is considered from two 
> factors. One is {{BufferConsumer}} finished, and the other is flush triggered.
> In current implementation, when the {{BufferConsumer}} is added into the 
> subpartition, then the backlog is increased as a result, but this 
> {{BufferConsumer}} is not yet available for network transport.
> Furthermore, the backlog would affect requesting floating buffers on 
> downstream side. That means some floating buffers are fetched in advance but 
> not be used for long time, so the floating buffers are not made use of 
> efficiently.
> We found this scenario extremely for rebalance selector on upstream side, so 
> we want to change when to increase backlog by finishing {{BufferConsumer}} or 
> flush triggered.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)