[jira] [Commented] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer

2019-07-10 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16881843#comment-16881843
 ] 

Piotr Nowojski commented on FLINK-13100:


You are right [~kevin.cyj], it seems that the bug was directly carried over 
from the older implementation.

> Fix the unexpected IOException during FileBufferReader#nextBuffer
> -
>
> Key: FLINK-13100
> URL: https://issues.apache.org/jira/browse/FLINK-13100
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Blocker
>
> In the implementation of FileBufferReader#nextBuffer, we expect the next 
> memory segment always available based on the assumption that the nextBuffer 
> call could only happen when the previous buffer was recycled before. 
> Otherwise it would throw an IOException in current implementation.
> In fact, the above assumption is not making sense based on the credit-based 
> and zero-copy features in network. The detail processes are as follows:
>  * The netty thread finishes calling the channel.writeAndFlush() in 
> PartitionRequestQueue and adds a listener to handle the ChannelFuture later. 
> Before future done, the corresponding buffer is not recycled because of 
> zero-copy improvement.
>  * Before the previous future done, the netty thread could trigger next 
> writeAndFlush via processing addCredit message, then 
> FileBufferReader#nextBuffer would throw exception because of previous buffer 
> not recycled.
> We thought of several ways for solving this potential bug:
>  * It does not trigger the next writeAndFlush before the previous future 
> done. To do so it has to maintain the future state and check it in relevant 
> actions. I wonder it might bring performance regression in network throughput 
> and bring extra state management.
>  * Adjust the implementation of current FileBufferReader. We ever regarded 
> the blocking partition view as always available based on the next buffer read 
> ahead, so it would be always added into available queue in 
> PartitionRequestQueue. Actually this next buffer ahead only simplifies the 
> process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view 
> availability could be judged based on available buffers in FileBufferReader 
> instead of next buffer ahead. When the buffer is recycled into 
> FileBufferReader after writeAndFlush done, it could call notifyDataAvailable 
> to add this view into available queue in PartitionRequestQueue.
> I prefer the second way because it would not bring any bad impacts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer

2019-07-09 Thread Yingjie Cao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16881129#comment-16881129
 ] 

Yingjie Cao commented on FLINK-13100:
-

The deadlock problem of spillable subpartition has been reported in this jira: 
https://issues.apache.org/jira/browse/FLINK-12329.

The following are some exception stacks which offer more information:

2019-07-09 12:35:54

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Fatal error at remote task manager 
'hadoop5035.et2.tbsite.net/11.180.36.89:32383'.

at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeMsg(CreditBasedPartitionRequestClientHandler.java:269)

at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:175)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)

at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)

at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)

at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)

at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)

at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)

at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)

at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)

at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)

at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)

at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)

at java.lang.Thread.run(Thread.java:834)

Caused by: java.io.IOException: Bug in BoundedBlockingSubpartition with FILE 
data: Requesting new buffer before previous buffer returned.

at 
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:267)

at 
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.channelWritabilityChanged(PartitionRequestQueue.java:204)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext.java:434)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext.java:416)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelWritabilityChanged(AbstractChannelHandlerContext.java:409)

at 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelWritabilityChanged(ChannelInboundHandlerAdapter.java:119)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext.java:434)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext.java:416)

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelWritabilityChanged(AbstractChannelHandlerContext.java:409)

at 

[jira] [Commented] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer

2019-07-09 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16881029#comment-16881029
 ] 

zhijiang commented on FLINK-13100:
--

Yes, the ITCase is also necessary and I think it should already have IT cases 
for blocking partitions before. The key problem is we have two different 
implementations for blocking partitions which would be used in real practice. 
But the selection is based on whether it is 32-bit or 64-bit system 
automatically, then the way for 32-bit might never be touched when executing IT 
cases before. So I think we might need to support the file-file way 
configurable in previous IT cases.

> Fix the unexpected IOException during FileBufferReader#nextBuffer
> -
>
> Key: FLINK-13100
> URL: https://issues.apache.org/jira/browse/FLINK-13100
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Priority: Blocker
>
> In the implementation of FileBufferReader#nextBuffer, we expect the next 
> memory segment always available based on the assumption that the nextBuffer 
> call could only happen when the previous buffer was recycled before. 
> Otherwise it would throw an IOException in current implementation.
> In fact, the above assumption is not making sense based on the credit-based 
> and zero-copy features in network. The detail processes are as follows:
>  * The netty thread finishes calling the channel.writeAndFlush() in 
> PartitionRequestQueue and adds a listener to handle the ChannelFuture later. 
> Before future done, the corresponding buffer is not recycled because of 
> zero-copy improvement.
>  * Before the previous future done, the netty thread could trigger next 
> writeAndFlush via processing addCredit message, then 
> FileBufferReader#nextBuffer would throw exception because of previous buffer 
> not recycled.
> We thought of several ways for solving this potential bug:
>  * It does not trigger the next writeAndFlush before the previous future 
> done. To do so it has to maintain the future state and check it in relevant 
> actions. I wonder it might bring performance regression in network throughput 
> and bring extra state management.
>  * Adjust the implementation of current FileBufferReader. We ever regarded 
> the blocking partition view as always available based on the next buffer read 
> ahead, so it would be always added into available queue in 
> PartitionRequestQueue. Actually this next buffer ahead only simplifies the 
> process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view 
> availability could be judged based on available buffers in FileBufferReader 
> instead of next buffer ahead. When the buffer is recycled into 
> FileBufferReader after writeAndFlush done, it could call notifyDataAvailable 
> to add this view into available queue in PartitionRequestQueue.
> I prefer the second way because it would not bring any bad impacts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer

2019-07-09 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16881009#comment-16881009
 ] 

Piotr Nowojski commented on FLINK-13100:


[~zjwang] beside unit testing this, can we add more IT cases using blocking 
subpartitions? Unit testing such behaviour might be fragile and we can loose 
test coverage after some refactorings, while for pipelined subpartitions it 
seems like we have very good indirect test coverage in the ITCases.

> Fix the unexpected IOException during FileBufferReader#nextBuffer
> -
>
> Key: FLINK-13100
> URL: https://issues.apache.org/jira/browse/FLINK-13100
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Priority: Blocker
>
> In the implementation of FileBufferReader#nextBuffer, we expect the next 
> memory segment always available based on the assumption that the nextBuffer 
> call could only happen when the previous buffer was recycled before. 
> Otherwise it would throw an IOException in current implementation.
> In fact, the above assumption is not making sense based on the credit-based 
> and zero-copy features in network. The detail processes are as follows:
>  * The netty thread finishes calling the channel.writeAndFlush() in 
> PartitionRequestQueue and adds a listener to handle the ChannelFuture later. 
> Before future done, the corresponding buffer is not recycled because of 
> zero-copy improvement.
>  * Before the previous future done, the netty thread could trigger next 
> writeAndFlush via processing addCredit message, then 
> FileBufferReader#nextBuffer would throw exception because of previous buffer 
> not recycled.
> We thought of several ways for solving this potential bug:
>  * It does not trigger the next writeAndFlush before the previous future 
> done. To do so it has to maintain the future state and check it in relevant 
> actions. I wonder it might bring performance regression in network throughput 
> and bring extra state management.
>  * Adjust the implementation of current FileBufferReader. We ever regarded 
> the blocking partition view as always available based on the next buffer read 
> ahead, so it would be always added into available queue in 
> PartitionRequestQueue. Actually this next buffer ahead only simplifies the 
> process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view 
> availability could be judged based on available buffers in FileBufferReader 
> instead of next buffer ahead. When the buffer is recycled into 
> FileBufferReader after writeAndFlush done, it could call notifyDataAvailable 
> to add this view into available queue in PartitionRequestQueue.
> I prefer the second way because it would not bring any bad impacts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer

2019-07-09 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16881003#comment-16881003
 ] 

zhijiang commented on FLINK-13100:
--

[~pnowojski] the previous deadlock was ever happened on our side and we fixed 
it internally in Alibaba. Considering the new BoundedBlockingSubpartition only 
focusing on mmap way at the first version, so we were not caring about this 
issue then. Now the 32-bit system could touch the way of file-file, then it is 
very easy to trigger this problem now when the flush is pending as I described 
above.

> Fix the unexpected IOException during FileBufferReader#nextBuffer
> -
>
> Key: FLINK-13100
> URL: https://issues.apache.org/jira/browse/FLINK-13100
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Priority: Blocker
>
> In the implementation of FileBufferReader#nextBuffer, we expect the next 
> memory segment always available based on the assumption that the nextBuffer 
> call could only happen when the previous buffer was recycled before. 
> Otherwise it would throw an IOException in current implementation.
> In fact, the above assumption is not making sense based on the credit-based 
> and zero-copy features in network. The detail processes are as follows:
>  * The netty thread finishes calling the channel.writeAndFlush() in 
> PartitionRequestQueue and adds a listener to handle the ChannelFuture later. 
> Before future done, the corresponding buffer is not recycled because of 
> zero-copy improvement.
>  * Before the previous future done, the netty thread could trigger next 
> writeAndFlush via processing addCredit message, then 
> FileBufferReader#nextBuffer would throw exception because of previous buffer 
> not recycled.
> We thought of several ways for solving this potential bug:
>  * It does not trigger the next writeAndFlush before the previous future 
> done. To do so it has to maintain the future state and check it in relevant 
> actions. I wonder it might bring performance regression in network throughput 
> and bring extra state management.
>  * Adjust the implementation of current FileBufferReader. We ever regarded 
> the blocking partition view as always available based on the next buffer read 
> ahead, so it would be always added into available queue in 
> PartitionRequestQueue. Actually this next buffer ahead only simplifies the 
> process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view 
> availability could be judged based on available buffers in FileBufferReader 
> instead of next buffer ahead. When the buffer is recycled into 
> FileBufferReader after writeAndFlush done, it could call notifyDataAvailable 
> to add this view into available queue in PartitionRequestQueue.
> I prefer the second way because it would not bring any bad impacts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer

2019-07-09 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880999#comment-16880999
 ] 

zhijiang commented on FLINK-13100:
--

In netty internal implementation of writeAndFlush(), the write operation could 
always be done but the flink buffer is not released yet. The flush operation 
could only be done after write operation when the channel interest ops include 
SelectionKey.OP_WRITE, that means the socket cache has enough space to hold the 
flushed message, otherwise the flush operation would be done with following 
writeAndFlush().

In summary, the channel future of writeAndFlush() done only indicates the write 
done, the flush is not always done. But the flink buffer is only recycled after 
flush done based on zero-copy improvement in netty stack. So it would cause 
this issue.

> Fix the unexpected IOException during FileBufferReader#nextBuffer
> -
>
> Key: FLINK-13100
> URL: https://issues.apache.org/jira/browse/FLINK-13100
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Priority: Blocker
>
> In the implementation of FileBufferReader#nextBuffer, we expect the next 
> memory segment always available based on the assumption that the nextBuffer 
> call could only happen when the previous buffer was recycled before. 
> Otherwise it would throw an IOException in current implementation.
> In fact, the above assumption is not making sense based on the credit-based 
> and zero-copy features in network. The detail processes are as follows:
>  * The netty thread finishes calling the channel.writeAndFlush() in 
> PartitionRequestQueue and adds a listener to handle the ChannelFuture later. 
> Before future done, the corresponding buffer is not recycled because of 
> zero-copy improvement.
>  * Before the previous future done, the netty thread could trigger next 
> writeAndFlush via processing addCredit message, then 
> FileBufferReader#nextBuffer would throw exception because of previous buffer 
> not recycled.
> We thought of several ways for solving this potential bug:
>  * It does not trigger the next writeAndFlush before the previous future 
> done. To do so it has to maintain the future state and check it in relevant 
> actions. I wonder it might bring performance regression in network throughput 
> and bring extra state management.
>  * Adjust the implementation of current FileBufferReader. We ever regarded 
> the blocking partition view as always available based on the next buffer read 
> ahead, so it would be always added into available queue in 
> PartitionRequestQueue. Actually this next buffer ahead only simplifies the 
> process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view 
> availability could be judged based on available buffers in FileBufferReader 
> instead of next buffer ahead. When the buffer is recycled into 
> FileBufferReader after writeAndFlush done, it could call notifyDataAvailable 
> to add this view into available queue in PartitionRequestQueue.
> I prefer the second way because it would not bring any bad impacts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer

2019-07-09 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880988#comment-16880988
 ] 

Piotr Nowojski commented on FLINK-13100:


I'm missing something here. Why the previous deadlock was never detected, while 
this error was?

Regarding the potential fix, in this case shouldn't the \{{FileBufferReader}} 
have a {{isAvailable()}} method equivalent?

> Fix the unexpected IOException during FileBufferReader#nextBuffer
> -
>
> Key: FLINK-13100
> URL: https://issues.apache.org/jira/browse/FLINK-13100
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Priority: Blocker
>
> In the implementation of FileBufferReader#nextBuffer, we expect the next 
> memory segment always available based on the assumption that the nextBuffer 
> call could only happen when the previous buffer was recycled before. 
> Otherwise it would throw an IOException in current implementation.
> In fact, the above assumption is not making sense based on the credit-based 
> and zero-copy features in network. The detail processes are as follows:
>  * The netty thread finishes calling the channel.writeAndFlush() in 
> PartitionRequestQueue and adds a listener to handle the ChannelFuture later. 
> Before future done, the corresponding buffer is not recycled because of 
> zero-copy improvement.
>  * Before the previous future done, the netty thread could trigger next 
> writeAndFlush via processing addCredit message, then 
> FileBufferReader#nextBuffer would throw exception because of previous buffer 
> not recycled.
> We thought of several ways for solving this potential bug:
>  * It does not trigger the next writeAndFlush before the previous future 
> done. To do so it has to maintain the future state and check it in relevant 
> actions. I wonder it might bring performance regression in network throughput 
> and bring extra state management.
>  * Adjust the implementation of current FileBufferReader. We ever regarded 
> the blocking partition view as always available based on the next buffer read 
> ahead, so it would be always added into available queue in 
> PartitionRequestQueue. Actually this next buffer ahead only simplifies the 
> process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view 
> availability could be judged based on available buffers in FileBufferReader 
> instead of next buffer ahead. When the buffer is recycled into 
> FileBufferReader after writeAndFlush done, it could call notifyDataAvailable 
> to add this view into available queue in PartitionRequestQueue.
> I prefer the second way because it would not bring any bad impacts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer

2019-07-08 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880465#comment-16880465
 ] 

zhijiang commented on FLINK-13100:
--

Yes, the previous SpilledSubpartitionView really has the deadlock issue based 
on two buffers. And now we have the similar issue as before, but throw the 
IOException instead.

Your understanding of above two parts is right. But there exists another case:
 * When the view is created, it would be enqueued into the netty thread loop at 
first time, because it has both data and credit now as you mentioned.
 * Then a buffer is fetched from view and writeAndFlush() is called. If it is 
still available in the queue, that means it must has credit because next buffer 
is always available for reading ahead. For this case it has no problem, because 
we only wait the previous writeAndFlush() done to trigger the next one.
 * But if it is not available (no credit) when writeAndFlush() is called, then 
the queue is empty in netty thread loop. Before the future of writeAndFlush() 
is done, the netty thread could still process the AddCredit message which would 
make the view become available again, then it would be added into queue to 
trigger the next writeAndFlush(). That means the previous buffer is not 
recycled but the next write is also triggered to cause the problem. The process 
might be like this : first writeAndFlush() pending -> addCredit(trigger second 
writeAndFlush) -> finish the first writeAndFlush() to recycle buffer.
 * I think it might be caused by the improvement of reusing flink buffer in 
netty stack from release-1.5. We could break writeAndFlush()  into write and 
flush two processes. In the before when the write process finishes, the flink 
buffer is copied into netty internal ByteBuffer to be recycled then, so it 
would not cause problem even though the second writeAndFlush is triggered 
before first pending done. But now the write process would still reference the 
flink buffer in netty stack until the flush is done.

I would try to mock this process in relevant unit tests to verify and I might 
submit this test tomorrow for understanding it easily.

> Fix the unexpected IOException during FileBufferReader#nextBuffer
> -
>
> Key: FLINK-13100
> URL: https://issues.apache.org/jira/browse/FLINK-13100
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Priority: Blocker
>
> In the implementation of FileBufferReader#nextBuffer, we expect the next 
> memory segment always available based on the assumption that the nextBuffer 
> call could only happen when the previous buffer was recycled before. 
> Otherwise it would throw an IOException in current implementation.
> In fact, the above assumption is not making sense based on the credit-based 
> and zero-copy features in network. The detail processes are as follows:
>  * The netty thread finishes calling the channel.writeAndFlush() in 
> PartitionRequestQueue and adds a listener to handle the ChannelFuture later. 
> Before future done, the corresponding buffer is not recycled because of 
> zero-copy improvement.
>  * Before the previous future done, the netty thread could trigger next 
> writeAndFlush via processing addCredit message, then 
> FileBufferReader#nextBuffer would throw exception because of previous buffer 
> not recycled.
> We thought of several ways for solving this potential bug:
>  * It does not trigger the next writeAndFlush before the previous future 
> done. To do so it has to maintain the future state and check it in relevant 
> actions. I wonder it might bring performance regression in network throughput 
> and bring extra state management.
>  * Adjust the implementation of current FileBufferReader. We ever regarded 
> the blocking partition view as always available based on the next buffer read 
> ahead, so it would be always added into available queue in 
> PartitionRequestQueue. Actually this next buffer ahead only simplifies the 
> process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view 
> availability could be judged based on available buffers in FileBufferReader 
> instead of next buffer ahead. When the buffer is recycled into 
> FileBufferReader after writeAndFlush done, it could call notifyDataAvailable 
> to add this view into available queue in PartitionRequestQueue.
> I prefer the second way because it would not bring any bad impacts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer

2019-07-08 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880340#comment-16880340
 ] 

Stephan Ewen commented on FLINK-13100:
--

[~zjwang] Could you help me understand a but more what exactly is happening?

To explain the thinking behind the current implementation:
I was trying to think how this was working with the previous implementation of 
the SpilledSubpartitionView?
https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java

My assumption was that this needs to be guaranteed to work with two buffers. 
The above referenced implementation had two buffers as well.
That implementation also had a blocking call to get the buffer. I was assuming 
that in the case of Netty as a reader of the partition, the same thread (the 
netty thread for that channel) would be responsible for writing the buffer to 
the socket and for returning the buffer.
So once the thread would be blocked on buffer availability, you have a 
deadlock, because the thread can never complete the ongoing write and return 
the buffer.

Does that mean that the cases where the new implementation throws an 
IOException are cases where the previous implementation deadlocked?

I originally understood the code that the following would happen
  - when a partition becomes available (has data and credit), it is enqueued 
into the netty thread loop
  - when the partition was previously not among the active partitions, then a 
new buffer is fetched and writeAndFlush() is called. If it was already in the 
available set, then we assume that there is a write pending anyways and we dont 
trigger another writeAndFlush() because we rely on the previous' write's future 
to trigger the next one.

I assume the second part does not quite behave like that?
  - 

> Fix the unexpected IOException during FileBufferReader#nextBuffer
> -
>
> Key: FLINK-13100
> URL: https://issues.apache.org/jira/browse/FLINK-13100
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Priority: Blocker
>
> In the implementation of FileBufferReader#nextBuffer, we expect the next 
> memory segment always available based on the assumption that the nextBuffer 
> call could only happen when the previous buffer was recycled before. 
> Otherwise it would throw an IOException in current implementation.
> In fact, the above assumption is not making sense based on the credit-based 
> and zero-copy features in network. The detail processes are as follows:
>  * The netty thread finishes calling the channel.writeAndFlush() in 
> PartitionRequestQueue and adds a listener to handle the ChannelFuture later. 
> Before future done, the corresponding buffer is not recycled because of 
> zero-copy improvement.
>  * Before the previous future done, the netty thread could trigger next 
> writeAndFlush via processing addCredit message, then 
> FileBufferReader#nextBuffer would throw exception because of previous buffer 
> not recycled.
> We thought of several ways for solving this potential bug:
>  * It does not trigger the next writeAndFlush before the previous future 
> done. To do so it has to maintain the future state and check it in relevant 
> actions. I wonder it might bring performance regression in network throughput 
> and bring extra state management.
>  * Adjust the implementation of current FileBufferReader. We ever regarded 
> the blocking partition view as always available based on the next buffer read 
> ahead, so it would be always added into available queue in 
> PartitionRequestQueue. Actually this next buffer ahead only simplifies the 
> process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view 
> availability could be judged based on available buffers in FileBufferReader 
> instead of next buffer ahead. When the buffer is recycled into 
> FileBufferReader after writeAndFlush done, it could call notifyDataAvailable 
> to add this view into available queue in PartitionRequestQueue.
> I prefer the second way because it would not bring any bad impacts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer

2019-07-04 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16878513#comment-16878513
 ] 

zhijiang commented on FLINK-13100:
--

What do you think [~pnowojski] [~StephanEwen]

> Fix the unexpected IOException during FileBufferReader#nextBuffer
> -
>
> Key: FLINK-13100
> URL: https://issues.apache.org/jira/browse/FLINK-13100
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Priority: Blocker
>
> In the implementation of FileBufferReader#nextBuffer, we expect the next 
> memory segment always available based on the assumption that the nextBuffer 
> call could only happen when the previous buffer was recycled before. 
> Otherwise it would throw an IOException in current implementation.
> In fact, the above assumption is not making sense based on the credit-based 
> and zero-copy features in network. The detail processes are as follows:
>  * The netty thread finishes calling the channel.writeAndFlush() in 
> PartitionRequestQueue and adds a listener to handle the ChannelFuture later. 
> Before future done, the corresponding buffer is not recycled because of 
> zero-copy improvement.
>  * Before the previous future done, the netty thread could trigger next 
> writeAndFlush via processing addCredit message, then 
> FileBufferReader#nextBuffer would throw exception because of previous buffer 
> not recycled.
> We thought of several ways for solving this potential bug:
>  * It does not trigger the next writeAndFlush before the previous future 
> done. To do so it has to maintain the future state and check it in relevant 
> actions. I wonder it might bring performance regression in network throughput 
> and bring extra state management.
>  * Adjust the implementation of current FileBufferReader. We ever regarded 
> the blocking partition view as always available based on the next buffer read 
> ahead, so it would be always added into available queue in 
> PartitionRequestQueue. Actually this next buffer ahead only simplifies the 
> process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view 
> availability could be judged based on available buffers in FileBufferReader 
> instead of next buffer ahead. When the buffer is recycled into 
> FileBufferReader after writeAndFlush done, it could call notifyDataAvailable 
> to add this view into available queue in PartitionRequestQueue.
> I prefer the second way because it would not bring any bad impacts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)