[ 
https://issues.apache.org/jira/browse/FLINK-18832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17171894#comment-17171894
 ] 

Zhijiang edited comment on FLINK-18832 at 8/6/20, 4:15 AM:
-----------------------------------------------------------

Thanks for reporting this bug [~trohrmann]!

By design I guess the BoundedBlockingSubpartition assumes no concurrent issue 
for `#flushCurrentBuffer` method. But actually the task thread and flusher 
thread can touch this method concurrently. I can think of some options for 
resolving it:

* Disable flusher thread for batch jobs, because it has no benefits for latency 
concern as the downstream will only request partition after upstream finishes 
based on current schedule way. Even it would bring harm for upstream writer to 
spill partial buffer after flush triggered.
* From long term goal, the flusher thread should be delegated by mailbox model, 
so we can avoid concurrent issue even if the flusher timeout valid for batch 
jobs.
* Breaks the previous assumption to allow concurrent access of  
`BoundedBlockingSubpartition#flushCurrentBuffer`.

If we can realize the second option soon, then we can bypass this bug. I 
remembered [~pnowojski]already submitted the PR for it before, but have not 
merged yet. If this way can not be realized in short time, then i prefer the 
first option to work around. WDYT?


was (Author: zjwang):
Thanks for reporting this bug [~trohrmann]!

By design I guess the BoundedBlockingSubpartition assumes no concurrent issue 
for `#flushCurrentBuffer` method. But actually the task thread and flusher 
thread can touch this method concurrently. I can thought of some options for 
resolving it:

* Disable flusher thread for batch jobs, because it has no benefits for latency 
concern as the downstream will only request partition after upstream finishes 
based on current schedule way. Even it would bring harm for upstream writer to 
spill partial buffer after flush triggered.
* From long term goal, the flusher thread should be delegated by mailbox model, 
so we can avoid concurrent issue even if the flusher timeout valid for batch 
jobs.
* Breaks the previous assumption to allow concurrent access of  
`BoundedBlockingSubpartition#flushCurrentBuffer`.

If we can realize the second option soon, then we can bypass this bug. I 
remembered [~pnowojski]already submitted the PR for it before, but have not 
merged yet. If this way can not be realized in short time, then i prefer the 
first option to work around. WDYT?

> BoundedBlockingSubpartition does not work with StreamTask
> ---------------------------------------------------------
>
>                 Key: FLINK-18832
>                 URL: https://issues.apache.org/jira/browse/FLINK-18832
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network, Runtime / Task
>    Affects Versions: 1.10.1, 1.12.0, 1.11.1
>            Reporter: Till Rohrmann
>            Priority: Major
>
> The {{BoundedBlockingSubpartition}} does not work with a {{StreamTask}} 
> because the {{StreamTask}} instantiates an {{OutputFlusher}} which 
> concurrently accesses the {{BoundedBlockingSubpartition}}. This concurrency 
> can lead to a double closing of the underlying {{BufferConsumer}} which 
> manifests in this stack trace:
> {code}
> [9:15 PM] Caused by: 
> org.apache.flink.shaded.netty4.io.netty.util.IllegalReferenceCountException: 
> refCnt: 0, increment: 1
>       at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain0(ReferenceCountUpdater.java:123)
>       at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain(ReferenceCountUpdater.java:110)
>       at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.retain(AbstractReferenceCountedByteBuf.java:80)
>       at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:174)
>       at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:47)
>       at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:127)
>       at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:41)
>       at 
> org.apache.flink.runtime.io.network.buffer.BufferConsumer.build(BufferConsumer.java:108)
>       at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.writeAndCloseBufferConsumer(BoundedBlockingSubpartition.java:156)
>       at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flushCurrentBuffer(BoundedBlockingSubpartition.java:144)
>       at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flush(BoundedBlockingSubpartition.java:135)
>       at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.flushAll(ResultPartition.java:245)
>       at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.flushAll(RecordWriter.java:183)
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.flush(RecordWriterOutput.java:156)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.flushOutputs(OperatorChain.java:344)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:602)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>       at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to