[jira] [Commented] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer
[ https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16881843#comment-16881843 ] Piotr Nowojski commented on FLINK-13100: You are right [~kevin.cyj], it seems that the bug was directly carried over from the older implementation. > Fix the unexpected IOException during FileBufferReader#nextBuffer > - > > Key: FLINK-13100 > URL: https://issues.apache.org/jira/browse/FLINK-13100 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Blocker > > In the implementation of FileBufferReader#nextBuffer, we expect the next > memory segment always available based on the assumption that the nextBuffer > call could only happen when the previous buffer was recycled before. > Otherwise it would throw an IOException in current implementation. > In fact, the above assumption is not making sense based on the credit-based > and zero-copy features in network. The detail processes are as follows: > * The netty thread finishes calling the channel.writeAndFlush() in > PartitionRequestQueue and adds a listener to handle the ChannelFuture later. > Before future done, the corresponding buffer is not recycled because of > zero-copy improvement. > * Before the previous future done, the netty thread could trigger next > writeAndFlush via processing addCredit message, then > FileBufferReader#nextBuffer would throw exception because of previous buffer > not recycled. > We thought of several ways for solving this potential bug: > * It does not trigger the next writeAndFlush before the previous future > done. To do so it has to maintain the future state and check it in relevant > actions. I wonder it might bring performance regression in network throughput > and bring extra state management. > * Adjust the implementation of current FileBufferReader. We ever regarded > the blocking partition view as always available based on the next buffer read > ahead, so it would be always added into available queue in > PartitionRequestQueue. Actually this next buffer ahead only simplifies the > process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view > availability could be judged based on available buffers in FileBufferReader > instead of next buffer ahead. When the buffer is recycled into > FileBufferReader after writeAndFlush done, it could call notifyDataAvailable > to add this view into available queue in PartitionRequestQueue. > I prefer the second way because it would not bring any bad impacts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer
[ https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16881129#comment-16881129 ] Yingjie Cao commented on FLINK-13100: - The deadlock problem of spillable subpartition has been reported in this jira: https://issues.apache.org/jira/browse/FLINK-12329. The following are some exception stacks which offer more information: 2019-07-09 12:35:54 org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Fatal error at remote task manager 'hadoop5035.et2.tbsite.net/11.180.36.89:32383'. at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeMsg(CreditBasedPartitionRequestClientHandler.java:269) at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:175) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) at java.lang.Thread.run(Thread.java:834) Caused by: java.io.IOException: Bug in BoundedBlockingSubpartition with FILE data: Requesting new buffer before previous buffer returned. at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:267) at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.channelWritabilityChanged(PartitionRequestQueue.java:204) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext.java:434) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext.java:416) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelWritabilityChanged(AbstractChannelHandlerContext.java:409) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelWritabilityChanged(ChannelInboundHandlerAdapter.java:119) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext.java:434) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelWritabilityChanged(AbstractChannelHandlerContext.java:416) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelWritabilityChanged(AbstractChannelHandlerContext.java:409) at
[jira] [Commented] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer
[ https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16881029#comment-16881029 ] zhijiang commented on FLINK-13100: -- Yes, the ITCase is also necessary and I think it should already have IT cases for blocking partitions before. The key problem is we have two different implementations for blocking partitions which would be used in real practice. But the selection is based on whether it is 32-bit or 64-bit system automatically, then the way for 32-bit might never be touched when executing IT cases before. So I think we might need to support the file-file way configurable in previous IT cases. > Fix the unexpected IOException during FileBufferReader#nextBuffer > - > > Key: FLINK-13100 > URL: https://issues.apache.org/jira/browse/FLINK-13100 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Priority: Blocker > > In the implementation of FileBufferReader#nextBuffer, we expect the next > memory segment always available based on the assumption that the nextBuffer > call could only happen when the previous buffer was recycled before. > Otherwise it would throw an IOException in current implementation. > In fact, the above assumption is not making sense based on the credit-based > and zero-copy features in network. The detail processes are as follows: > * The netty thread finishes calling the channel.writeAndFlush() in > PartitionRequestQueue and adds a listener to handle the ChannelFuture later. > Before future done, the corresponding buffer is not recycled because of > zero-copy improvement. > * Before the previous future done, the netty thread could trigger next > writeAndFlush via processing addCredit message, then > FileBufferReader#nextBuffer would throw exception because of previous buffer > not recycled. > We thought of several ways for solving this potential bug: > * It does not trigger the next writeAndFlush before the previous future > done. To do so it has to maintain the future state and check it in relevant > actions. I wonder it might bring performance regression in network throughput > and bring extra state management. > * Adjust the implementation of current FileBufferReader. We ever regarded > the blocking partition view as always available based on the next buffer read > ahead, so it would be always added into available queue in > PartitionRequestQueue. Actually this next buffer ahead only simplifies the > process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view > availability could be judged based on available buffers in FileBufferReader > instead of next buffer ahead. When the buffer is recycled into > FileBufferReader after writeAndFlush done, it could call notifyDataAvailable > to add this view into available queue in PartitionRequestQueue. > I prefer the second way because it would not bring any bad impacts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer
[ https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16881009#comment-16881009 ] Piotr Nowojski commented on FLINK-13100: [~zjwang] beside unit testing this, can we add more IT cases using blocking subpartitions? Unit testing such behaviour might be fragile and we can loose test coverage after some refactorings, while for pipelined subpartitions it seems like we have very good indirect test coverage in the ITCases. > Fix the unexpected IOException during FileBufferReader#nextBuffer > - > > Key: FLINK-13100 > URL: https://issues.apache.org/jira/browse/FLINK-13100 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Priority: Blocker > > In the implementation of FileBufferReader#nextBuffer, we expect the next > memory segment always available based on the assumption that the nextBuffer > call could only happen when the previous buffer was recycled before. > Otherwise it would throw an IOException in current implementation. > In fact, the above assumption is not making sense based on the credit-based > and zero-copy features in network. The detail processes are as follows: > * The netty thread finishes calling the channel.writeAndFlush() in > PartitionRequestQueue and adds a listener to handle the ChannelFuture later. > Before future done, the corresponding buffer is not recycled because of > zero-copy improvement. > * Before the previous future done, the netty thread could trigger next > writeAndFlush via processing addCredit message, then > FileBufferReader#nextBuffer would throw exception because of previous buffer > not recycled. > We thought of several ways for solving this potential bug: > * It does not trigger the next writeAndFlush before the previous future > done. To do so it has to maintain the future state and check it in relevant > actions. I wonder it might bring performance regression in network throughput > and bring extra state management. > * Adjust the implementation of current FileBufferReader. We ever regarded > the blocking partition view as always available based on the next buffer read > ahead, so it would be always added into available queue in > PartitionRequestQueue. Actually this next buffer ahead only simplifies the > process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view > availability could be judged based on available buffers in FileBufferReader > instead of next buffer ahead. When the buffer is recycled into > FileBufferReader after writeAndFlush done, it could call notifyDataAvailable > to add this view into available queue in PartitionRequestQueue. > I prefer the second way because it would not bring any bad impacts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer
[ https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16881003#comment-16881003 ] zhijiang commented on FLINK-13100: -- [~pnowojski] the previous deadlock was ever happened on our side and we fixed it internally in Alibaba. Considering the new BoundedBlockingSubpartition only focusing on mmap way at the first version, so we were not caring about this issue then. Now the 32-bit system could touch the way of file-file, then it is very easy to trigger this problem now when the flush is pending as I described above. > Fix the unexpected IOException during FileBufferReader#nextBuffer > - > > Key: FLINK-13100 > URL: https://issues.apache.org/jira/browse/FLINK-13100 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Priority: Blocker > > In the implementation of FileBufferReader#nextBuffer, we expect the next > memory segment always available based on the assumption that the nextBuffer > call could only happen when the previous buffer was recycled before. > Otherwise it would throw an IOException in current implementation. > In fact, the above assumption is not making sense based on the credit-based > and zero-copy features in network. The detail processes are as follows: > * The netty thread finishes calling the channel.writeAndFlush() in > PartitionRequestQueue and adds a listener to handle the ChannelFuture later. > Before future done, the corresponding buffer is not recycled because of > zero-copy improvement. > * Before the previous future done, the netty thread could trigger next > writeAndFlush via processing addCredit message, then > FileBufferReader#nextBuffer would throw exception because of previous buffer > not recycled. > We thought of several ways for solving this potential bug: > * It does not trigger the next writeAndFlush before the previous future > done. To do so it has to maintain the future state and check it in relevant > actions. I wonder it might bring performance regression in network throughput > and bring extra state management. > * Adjust the implementation of current FileBufferReader. We ever regarded > the blocking partition view as always available based on the next buffer read > ahead, so it would be always added into available queue in > PartitionRequestQueue. Actually this next buffer ahead only simplifies the > process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view > availability could be judged based on available buffers in FileBufferReader > instead of next buffer ahead. When the buffer is recycled into > FileBufferReader after writeAndFlush done, it could call notifyDataAvailable > to add this view into available queue in PartitionRequestQueue. > I prefer the second way because it would not bring any bad impacts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer
[ https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880999#comment-16880999 ] zhijiang commented on FLINK-13100: -- In netty internal implementation of writeAndFlush(), the write operation could always be done but the flink buffer is not released yet. The flush operation could only be done after write operation when the channel interest ops include SelectionKey.OP_WRITE, that means the socket cache has enough space to hold the flushed message, otherwise the flush operation would be done with following writeAndFlush(). In summary, the channel future of writeAndFlush() done only indicates the write done, the flush is not always done. But the flink buffer is only recycled after flush done based on zero-copy improvement in netty stack. So it would cause this issue. > Fix the unexpected IOException during FileBufferReader#nextBuffer > - > > Key: FLINK-13100 > URL: https://issues.apache.org/jira/browse/FLINK-13100 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Priority: Blocker > > In the implementation of FileBufferReader#nextBuffer, we expect the next > memory segment always available based on the assumption that the nextBuffer > call could only happen when the previous buffer was recycled before. > Otherwise it would throw an IOException in current implementation. > In fact, the above assumption is not making sense based on the credit-based > and zero-copy features in network. The detail processes are as follows: > * The netty thread finishes calling the channel.writeAndFlush() in > PartitionRequestQueue and adds a listener to handle the ChannelFuture later. > Before future done, the corresponding buffer is not recycled because of > zero-copy improvement. > * Before the previous future done, the netty thread could trigger next > writeAndFlush via processing addCredit message, then > FileBufferReader#nextBuffer would throw exception because of previous buffer > not recycled. > We thought of several ways for solving this potential bug: > * It does not trigger the next writeAndFlush before the previous future > done. To do so it has to maintain the future state and check it in relevant > actions. I wonder it might bring performance regression in network throughput > and bring extra state management. > * Adjust the implementation of current FileBufferReader. We ever regarded > the blocking partition view as always available based on the next buffer read > ahead, so it would be always added into available queue in > PartitionRequestQueue. Actually this next buffer ahead only simplifies the > process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view > availability could be judged based on available buffers in FileBufferReader > instead of next buffer ahead. When the buffer is recycled into > FileBufferReader after writeAndFlush done, it could call notifyDataAvailable > to add this view into available queue in PartitionRequestQueue. > I prefer the second way because it would not bring any bad impacts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer
[ https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880988#comment-16880988 ] Piotr Nowojski commented on FLINK-13100: I'm missing something here. Why the previous deadlock was never detected, while this error was? Regarding the potential fix, in this case shouldn't the \{{FileBufferReader}} have a {{isAvailable()}} method equivalent? > Fix the unexpected IOException during FileBufferReader#nextBuffer > - > > Key: FLINK-13100 > URL: https://issues.apache.org/jira/browse/FLINK-13100 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Priority: Blocker > > In the implementation of FileBufferReader#nextBuffer, we expect the next > memory segment always available based on the assumption that the nextBuffer > call could only happen when the previous buffer was recycled before. > Otherwise it would throw an IOException in current implementation. > In fact, the above assumption is not making sense based on the credit-based > and zero-copy features in network. The detail processes are as follows: > * The netty thread finishes calling the channel.writeAndFlush() in > PartitionRequestQueue and adds a listener to handle the ChannelFuture later. > Before future done, the corresponding buffer is not recycled because of > zero-copy improvement. > * Before the previous future done, the netty thread could trigger next > writeAndFlush via processing addCredit message, then > FileBufferReader#nextBuffer would throw exception because of previous buffer > not recycled. > We thought of several ways for solving this potential bug: > * It does not trigger the next writeAndFlush before the previous future > done. To do so it has to maintain the future state and check it in relevant > actions. I wonder it might bring performance regression in network throughput > and bring extra state management. > * Adjust the implementation of current FileBufferReader. We ever regarded > the blocking partition view as always available based on the next buffer read > ahead, so it would be always added into available queue in > PartitionRequestQueue. Actually this next buffer ahead only simplifies the > process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view > availability could be judged based on available buffers in FileBufferReader > instead of next buffer ahead. When the buffer is recycled into > FileBufferReader after writeAndFlush done, it could call notifyDataAvailable > to add this view into available queue in PartitionRequestQueue. > I prefer the second way because it would not bring any bad impacts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer
[ https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880465#comment-16880465 ] zhijiang commented on FLINK-13100: -- Yes, the previous SpilledSubpartitionView really has the deadlock issue based on two buffers. And now we have the similar issue as before, but throw the IOException instead. Your understanding of above two parts is right. But there exists another case: * When the view is created, it would be enqueued into the netty thread loop at first time, because it has both data and credit now as you mentioned. * Then a buffer is fetched from view and writeAndFlush() is called. If it is still available in the queue, that means it must has credit because next buffer is always available for reading ahead. For this case it has no problem, because we only wait the previous writeAndFlush() done to trigger the next one. * But if it is not available (no credit) when writeAndFlush() is called, then the queue is empty in netty thread loop. Before the future of writeAndFlush() is done, the netty thread could still process the AddCredit message which would make the view become available again, then it would be added into queue to trigger the next writeAndFlush(). That means the previous buffer is not recycled but the next write is also triggered to cause the problem. The process might be like this : first writeAndFlush() pending -> addCredit(trigger second writeAndFlush) -> finish the first writeAndFlush() to recycle buffer. * I think it might be caused by the improvement of reusing flink buffer in netty stack from release-1.5. We could break writeAndFlush() into write and flush two processes. In the before when the write process finishes, the flink buffer is copied into netty internal ByteBuffer to be recycled then, so it would not cause problem even though the second writeAndFlush is triggered before first pending done. But now the write process would still reference the flink buffer in netty stack until the flush is done. I would try to mock this process in relevant unit tests to verify and I might submit this test tomorrow for understanding it easily. > Fix the unexpected IOException during FileBufferReader#nextBuffer > - > > Key: FLINK-13100 > URL: https://issues.apache.org/jira/browse/FLINK-13100 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Priority: Blocker > > In the implementation of FileBufferReader#nextBuffer, we expect the next > memory segment always available based on the assumption that the nextBuffer > call could only happen when the previous buffer was recycled before. > Otherwise it would throw an IOException in current implementation. > In fact, the above assumption is not making sense based on the credit-based > and zero-copy features in network. The detail processes are as follows: > * The netty thread finishes calling the channel.writeAndFlush() in > PartitionRequestQueue and adds a listener to handle the ChannelFuture later. > Before future done, the corresponding buffer is not recycled because of > zero-copy improvement. > * Before the previous future done, the netty thread could trigger next > writeAndFlush via processing addCredit message, then > FileBufferReader#nextBuffer would throw exception because of previous buffer > not recycled. > We thought of several ways for solving this potential bug: > * It does not trigger the next writeAndFlush before the previous future > done. To do so it has to maintain the future state and check it in relevant > actions. I wonder it might bring performance regression in network throughput > and bring extra state management. > * Adjust the implementation of current FileBufferReader. We ever regarded > the blocking partition view as always available based on the next buffer read > ahead, so it would be always added into available queue in > PartitionRequestQueue. Actually this next buffer ahead only simplifies the > process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view > availability could be judged based on available buffers in FileBufferReader > instead of next buffer ahead. When the buffer is recycled into > FileBufferReader after writeAndFlush done, it could call notifyDataAvailable > to add this view into available queue in PartitionRequestQueue. > I prefer the second way because it would not bring any bad impacts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer
[ https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880340#comment-16880340 ] Stephan Ewen commented on FLINK-13100: -- [~zjwang] Could you help me understand a but more what exactly is happening? To explain the thinking behind the current implementation: I was trying to think how this was working with the previous implementation of the SpilledSubpartitionView? https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java My assumption was that this needs to be guaranteed to work with two buffers. The above referenced implementation had two buffers as well. That implementation also had a blocking call to get the buffer. I was assuming that in the case of Netty as a reader of the partition, the same thread (the netty thread for that channel) would be responsible for writing the buffer to the socket and for returning the buffer. So once the thread would be blocked on buffer availability, you have a deadlock, because the thread can never complete the ongoing write and return the buffer. Does that mean that the cases where the new implementation throws an IOException are cases where the previous implementation deadlocked? I originally understood the code that the following would happen - when a partition becomes available (has data and credit), it is enqueued into the netty thread loop - when the partition was previously not among the active partitions, then a new buffer is fetched and writeAndFlush() is called. If it was already in the available set, then we assume that there is a write pending anyways and we dont trigger another writeAndFlush() because we rely on the previous' write's future to trigger the next one. I assume the second part does not quite behave like that? - > Fix the unexpected IOException during FileBufferReader#nextBuffer > - > > Key: FLINK-13100 > URL: https://issues.apache.org/jira/browse/FLINK-13100 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Priority: Blocker > > In the implementation of FileBufferReader#nextBuffer, we expect the next > memory segment always available based on the assumption that the nextBuffer > call could only happen when the previous buffer was recycled before. > Otherwise it would throw an IOException in current implementation. > In fact, the above assumption is not making sense based on the credit-based > and zero-copy features in network. The detail processes are as follows: > * The netty thread finishes calling the channel.writeAndFlush() in > PartitionRequestQueue and adds a listener to handle the ChannelFuture later. > Before future done, the corresponding buffer is not recycled because of > zero-copy improvement. > * Before the previous future done, the netty thread could trigger next > writeAndFlush via processing addCredit message, then > FileBufferReader#nextBuffer would throw exception because of previous buffer > not recycled. > We thought of several ways for solving this potential bug: > * It does not trigger the next writeAndFlush before the previous future > done. To do so it has to maintain the future state and check it in relevant > actions. I wonder it might bring performance regression in network throughput > and bring extra state management. > * Adjust the implementation of current FileBufferReader. We ever regarded > the blocking partition view as always available based on the next buffer read > ahead, so it would be always added into available queue in > PartitionRequestQueue. Actually this next buffer ahead only simplifies the > process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view > availability could be judged based on available buffers in FileBufferReader > instead of next buffer ahead. When the buffer is recycled into > FileBufferReader after writeAndFlush done, it could call notifyDataAvailable > to add this view into available queue in PartitionRequestQueue. > I prefer the second way because it would not bring any bad impacts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-13100) Fix the unexpected IOException during FileBufferReader#nextBuffer
[ https://issues.apache.org/jira/browse/FLINK-13100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16878513#comment-16878513 ] zhijiang commented on FLINK-13100: -- What do you think [~pnowojski] [~StephanEwen] > Fix the unexpected IOException during FileBufferReader#nextBuffer > - > > Key: FLINK-13100 > URL: https://issues.apache.org/jira/browse/FLINK-13100 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Priority: Blocker > > In the implementation of FileBufferReader#nextBuffer, we expect the next > memory segment always available based on the assumption that the nextBuffer > call could only happen when the previous buffer was recycled before. > Otherwise it would throw an IOException in current implementation. > In fact, the above assumption is not making sense based on the credit-based > and zero-copy features in network. The detail processes are as follows: > * The netty thread finishes calling the channel.writeAndFlush() in > PartitionRequestQueue and adds a listener to handle the ChannelFuture later. > Before future done, the corresponding buffer is not recycled because of > zero-copy improvement. > * Before the previous future done, the netty thread could trigger next > writeAndFlush via processing addCredit message, then > FileBufferReader#nextBuffer would throw exception because of previous buffer > not recycled. > We thought of several ways for solving this potential bug: > * It does not trigger the next writeAndFlush before the previous future > done. To do so it has to maintain the future state and check it in relevant > actions. I wonder it might bring performance regression in network throughput > and bring extra state management. > * Adjust the implementation of current FileBufferReader. We ever regarded > the blocking partition view as always available based on the next buffer read > ahead, so it would be always added into available queue in > PartitionRequestQueue. Actually this next buffer ahead only simplifies the > process of BoundedBlockingSubpartitionReader#notifyDataAvailable. The view > availability could be judged based on available buffers in FileBufferReader > instead of next buffer ahead. When the buffer is recycled into > FileBufferReader after writeAndFlush done, it could call notifyDataAvailable > to add this view into available queue in PartitionRequestQueue. > I prefer the second way because it would not bring any bad impacts. -- This message was sent by Atlassian JIRA (v7.6.3#76005)