[jira] [Commented] (FLINK-18832) BoundedBlockingSubpartition does not work with StreamTask

2020-08-13 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17176817#comment-17176817
 ] 

Zhijiang commented on FLINK-18832:
--

[~pnowojski] Agree with you proposal. It would be nice to give some friendly 
message for the current limitation. I would submit the PR for it.

> BoundedBlockingSubpartition does not work with StreamTask
> -
>
> Key: FLINK-18832
> URL: https://issues.apache.org/jira/browse/FLINK-18832
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Runtime / Task
>Affects Versions: 1.10.1, 1.12.0, 1.11.1
>Reporter: Till Rohrmann
>Assignee: Zhijiang
>Priority: Major
>
> The {{BoundedBlockingSubpartition}} does not work with a {{StreamTask}} 
> because the {{StreamTask}} instantiates an {{OutputFlusher}} which 
> concurrently accesses the {{BoundedBlockingSubpartition}}. This concurrency 
> can lead to a double closing of the underlying {{BufferConsumer}} which 
> manifests in this stack trace:
> {code}
> [9:15 PM] Caused by: 
> org.apache.flink.shaded.netty4.io.netty.util.IllegalReferenceCountException: 
> refCnt: 0, increment: 1
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain0(ReferenceCountUpdater.java:123)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain(ReferenceCountUpdater.java:110)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.retain(AbstractReferenceCountedByteBuf.java:80)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:174)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:47)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:127)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:41)
>   at 
> org.apache.flink.runtime.io.network.buffer.BufferConsumer.build(BufferConsumer.java:108)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.writeAndCloseBufferConsumer(BoundedBlockingSubpartition.java:156)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flushCurrentBuffer(BoundedBlockingSubpartition.java:144)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flush(BoundedBlockingSubpartition.java:135)
>   at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.flushAll(ResultPartition.java:245)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.flushAll(RecordWriter.java:183)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.flush(RecordWriterOutput.java:156)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.flushOutputs(OperatorChain.java:344)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:602)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18832) BoundedBlockingSubpartition does not work with StreamTask

2020-08-12 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17176116#comment-17176116
 ] 

Piotr Nowojski commented on FLINK-18832:


+1

[~zjwang] should we maybe add a check state somewhere in the code, to fail 
early with a nice error message "buffer timeout can not be used with blocking 
subpartition"? Ideally somewhere in the {{BoundedSubpartitionXXX}} constructor, 
but I'm not sure if we have there an easy access to the task's configuration.

> BoundedBlockingSubpartition does not work with StreamTask
> -
>
> Key: FLINK-18832
> URL: https://issues.apache.org/jira/browse/FLINK-18832
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Runtime / Task
>Affects Versions: 1.10.1, 1.12.0, 1.11.1
>Reporter: Till Rohrmann
>Assignee: Zhijiang
>Priority: Major
>
> The {{BoundedBlockingSubpartition}} does not work with a {{StreamTask}} 
> because the {{StreamTask}} instantiates an {{OutputFlusher}} which 
> concurrently accesses the {{BoundedBlockingSubpartition}}. This concurrency 
> can lead to a double closing of the underlying {{BufferConsumer}} which 
> manifests in this stack trace:
> {code}
> [9:15 PM] Caused by: 
> org.apache.flink.shaded.netty4.io.netty.util.IllegalReferenceCountException: 
> refCnt: 0, increment: 1
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain0(ReferenceCountUpdater.java:123)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain(ReferenceCountUpdater.java:110)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.retain(AbstractReferenceCountedByteBuf.java:80)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:174)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:47)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:127)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:41)
>   at 
> org.apache.flink.runtime.io.network.buffer.BufferConsumer.build(BufferConsumer.java:108)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.writeAndCloseBufferConsumer(BoundedBlockingSubpartition.java:156)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flushCurrentBuffer(BoundedBlockingSubpartition.java:144)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flush(BoundedBlockingSubpartition.java:135)
>   at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.flushAll(ResultPartition.java:245)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.flushAll(RecordWriter.java:183)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.flush(RecordWriterOutput.java:156)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.flushOutputs(OperatorChain.java:344)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:602)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18832) BoundedBlockingSubpartition does not work with StreamTask

2020-08-11 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17175960#comment-17175960
 ] 

Zhijiang commented on FLINK-18832:
--

Thanks for the explanation [~dwysakowicz]. 

If so, I think we can bypass this issue if in future we want to unify the 
DataStream API to simulate batch job with explicitly setBufferTimeout(-1). ATM, 
the blink planner and BatchTask from DataSet API already explicitly 
setBufferTimeout(-1) as well.

If nobody has other concerns, I will close this ticket now. And if we want to 
support buffer timeout for batch job in future, we can focus on it then.

> BoundedBlockingSubpartition does not work with StreamTask
> -
>
> Key: FLINK-18832
> URL: https://issues.apache.org/jira/browse/FLINK-18832
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Runtime / Task
>Affects Versions: 1.10.1, 1.12.0, 1.11.1
>Reporter: Till Rohrmann
>Assignee: Zhijiang
>Priority: Major
>
> The {{BoundedBlockingSubpartition}} does not work with a {{StreamTask}} 
> because the {{StreamTask}} instantiates an {{OutputFlusher}} which 
> concurrently accesses the {{BoundedBlockingSubpartition}}. This concurrency 
> can lead to a double closing of the underlying {{BufferConsumer}} which 
> manifests in this stack trace:
> {code}
> [9:15 PM] Caused by: 
> org.apache.flink.shaded.netty4.io.netty.util.IllegalReferenceCountException: 
> refCnt: 0, increment: 1
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain0(ReferenceCountUpdater.java:123)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain(ReferenceCountUpdater.java:110)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.retain(AbstractReferenceCountedByteBuf.java:80)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:174)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:47)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:127)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:41)
>   at 
> org.apache.flink.runtime.io.network.buffer.BufferConsumer.build(BufferConsumer.java:108)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.writeAndCloseBufferConsumer(BoundedBlockingSubpartition.java:156)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flushCurrentBuffer(BoundedBlockingSubpartition.java:144)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flush(BoundedBlockingSubpartition.java:135)
>   at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.flushAll(ResultPartition.java:245)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.flushAll(RecordWriter.java:183)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.flush(RecordWriterOutput.java:156)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.flushOutputs(OperatorChain.java:344)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:602)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18832) BoundedBlockingSubpartition does not work with StreamTask

2020-08-11 Thread Dawid Wysakowicz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17175504#comment-17175504
 ] 

Dawid Wysakowicz commented on FLINK-18832:
--

As far as I can tell currently that can not occur. The Blink batch planner, 
which is the only user of the lazy scheduling with blocking data exchanges 
disables the output flashers by setting: {{execEnv.setBufferTimeout(-1);}}. I 
am working on exposing the "{{BATCH}}" behaviour in the DataStream as well but 
I did not set this parameter, as I was not aware of the consequences. I think 
it is not a pressing issue if we say it is an illegal combination.


> BoundedBlockingSubpartition does not work with StreamTask
> -
>
> Key: FLINK-18832
> URL: https://issues.apache.org/jira/browse/FLINK-18832
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Runtime / Task
>Affects Versions: 1.10.1, 1.12.0, 1.11.1
>Reporter: Till Rohrmann
>Assignee: Zhijiang
>Priority: Major
>
> The {{BoundedBlockingSubpartition}} does not work with a {{StreamTask}} 
> because the {{StreamTask}} instantiates an {{OutputFlusher}} which 
> concurrently accesses the {{BoundedBlockingSubpartition}}. This concurrency 
> can lead to a double closing of the underlying {{BufferConsumer}} which 
> manifests in this stack trace:
> {code}
> [9:15 PM] Caused by: 
> org.apache.flink.shaded.netty4.io.netty.util.IllegalReferenceCountException: 
> refCnt: 0, increment: 1
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain0(ReferenceCountUpdater.java:123)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain(ReferenceCountUpdater.java:110)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.retain(AbstractReferenceCountedByteBuf.java:80)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:174)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:47)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:127)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:41)
>   at 
> org.apache.flink.runtime.io.network.buffer.BufferConsumer.build(BufferConsumer.java:108)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.writeAndCloseBufferConsumer(BoundedBlockingSubpartition.java:156)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flushCurrentBuffer(BoundedBlockingSubpartition.java:144)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flush(BoundedBlockingSubpartition.java:135)
>   at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.flushAll(ResultPartition.java:245)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.flushAll(RecordWriter.java:183)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.flush(RecordWriterOutput.java:156)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.flushOutputs(OperatorChain.java:344)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:602)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18832) BoundedBlockingSubpartition does not work with StreamTask

2020-08-11 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17175495#comment-17175495
 ] 

Till Rohrmann commented on FLINK-18832:
---

[~dwysakowicz] reported this issue when running a {{DataStream}} job with lazy 
scheduling and blocking data exchanges. Not sure whether this configuration can 
actually occur atm.

> BoundedBlockingSubpartition does not work with StreamTask
> -
>
> Key: FLINK-18832
> URL: https://issues.apache.org/jira/browse/FLINK-18832
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Runtime / Task
>Affects Versions: 1.10.1, 1.12.0, 1.11.1
>Reporter: Till Rohrmann
>Assignee: Zhijiang
>Priority: Major
>
> The {{BoundedBlockingSubpartition}} does not work with a {{StreamTask}} 
> because the {{StreamTask}} instantiates an {{OutputFlusher}} which 
> concurrently accesses the {{BoundedBlockingSubpartition}}. This concurrency 
> can lead to a double closing of the underlying {{BufferConsumer}} which 
> manifests in this stack trace:
> {code}
> [9:15 PM] Caused by: 
> org.apache.flink.shaded.netty4.io.netty.util.IllegalReferenceCountException: 
> refCnt: 0, increment: 1
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain0(ReferenceCountUpdater.java:123)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain(ReferenceCountUpdater.java:110)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.retain(AbstractReferenceCountedByteBuf.java:80)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:174)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:47)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:127)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:41)
>   at 
> org.apache.flink.runtime.io.network.buffer.BufferConsumer.build(BufferConsumer.java:108)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.writeAndCloseBufferConsumer(BoundedBlockingSubpartition.java:156)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flushCurrentBuffer(BoundedBlockingSubpartition.java:144)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flush(BoundedBlockingSubpartition.java:135)
>   at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.flushAll(ResultPartition.java:245)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.flushAll(RecordWriter.java:183)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.flush(RecordWriterOutput.java:156)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.flushOutputs(OperatorChain.java:344)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:602)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18832) BoundedBlockingSubpartition does not work with StreamTask

2020-08-10 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17174422#comment-17174422
 ] 

Piotr Nowojski commented on FLINK-18832:


[~trohrmann] [~zjwang] why has this bug appeared, in what scenario? What are 
the use cases? Do we need to support flusher for the batch jobs?

> BoundedBlockingSubpartition does not work with StreamTask
> -
>
> Key: FLINK-18832
> URL: https://issues.apache.org/jira/browse/FLINK-18832
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Runtime / Task
>Affects Versions: 1.10.1, 1.12.0, 1.11.1
>Reporter: Till Rohrmann
>Assignee: Zhijiang
>Priority: Major
>
> The {{BoundedBlockingSubpartition}} does not work with a {{StreamTask}} 
> because the {{StreamTask}} instantiates an {{OutputFlusher}} which 
> concurrently accesses the {{BoundedBlockingSubpartition}}. This concurrency 
> can lead to a double closing of the underlying {{BufferConsumer}} which 
> manifests in this stack trace:
> {code}
> [9:15 PM] Caused by: 
> org.apache.flink.shaded.netty4.io.netty.util.IllegalReferenceCountException: 
> refCnt: 0, increment: 1
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain0(ReferenceCountUpdater.java:123)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain(ReferenceCountUpdater.java:110)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.retain(AbstractReferenceCountedByteBuf.java:80)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:174)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:47)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:127)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:41)
>   at 
> org.apache.flink.runtime.io.network.buffer.BufferConsumer.build(BufferConsumer.java:108)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.writeAndCloseBufferConsumer(BoundedBlockingSubpartition.java:156)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flushCurrentBuffer(BoundedBlockingSubpartition.java:144)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flush(BoundedBlockingSubpartition.java:135)
>   at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.flushAll(ResultPartition.java:245)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.flushAll(RecordWriter.java:183)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.flush(RecordWriterOutput.java:156)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.flushOutputs(OperatorChain.java:344)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:602)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18832) BoundedBlockingSubpartition does not work with StreamTask

2020-08-06 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17172083#comment-17172083
 ] 

Piotr Nowojski commented on FLINK-18832:


Thanks [~zjwang] for reminding about this older PR of mine (FLINK-15750). There 
were some issues that I didn't have time to resolve (it was my side project). I 
can dig out this PR tomorrow, try to rebase it on the latest master and check 
what issues are remaining to be addressed.

> BoundedBlockingSubpartition does not work with StreamTask
> -
>
> Key: FLINK-18832
> URL: https://issues.apache.org/jira/browse/FLINK-18832
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Runtime / Task
>Affects Versions: 1.10.1, 1.12.0, 1.11.1
>Reporter: Till Rohrmann
>Assignee: Zhijiang
>Priority: Major
>
> The {{BoundedBlockingSubpartition}} does not work with a {{StreamTask}} 
> because the {{StreamTask}} instantiates an {{OutputFlusher}} which 
> concurrently accesses the {{BoundedBlockingSubpartition}}. This concurrency 
> can lead to a double closing of the underlying {{BufferConsumer}} which 
> manifests in this stack trace:
> {code}
> [9:15 PM] Caused by: 
> org.apache.flink.shaded.netty4.io.netty.util.IllegalReferenceCountException: 
> refCnt: 0, increment: 1
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain0(ReferenceCountUpdater.java:123)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain(ReferenceCountUpdater.java:110)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.retain(AbstractReferenceCountedByteBuf.java:80)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:174)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:47)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:127)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:41)
>   at 
> org.apache.flink.runtime.io.network.buffer.BufferConsumer.build(BufferConsumer.java:108)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.writeAndCloseBufferConsumer(BoundedBlockingSubpartition.java:156)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flushCurrentBuffer(BoundedBlockingSubpartition.java:144)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flush(BoundedBlockingSubpartition.java:135)
>   at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.flushAll(ResultPartition.java:245)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.flushAll(RecordWriter.java:183)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.flush(RecordWriterOutput.java:156)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.flushOutputs(OperatorChain.java:344)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:602)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18832) BoundedBlockingSubpartition does not work with StreamTask

2020-08-05 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17171894#comment-17171894
 ] 

Zhijiang commented on FLINK-18832:
--

Thanks for reporting this bug [~trohrmann]!

By design I guess the BoundedBlockingSubpartition assumes no concurrent issue 
for `#flushCurrentBuffer` method. But actually the task thread and flusher 
thread can touch this method concurrently. I can thought of some options for 
resolving it:

* Disable flusher thread for batch jobs, because it has no benefits for latency 
concern as the downstream will only request partition after upstream finishes 
based on current schedule way. Even it would bring harm for upstream writer to 
spill partial buffer after flush triggered.
* From long term goal, the flusher thread should be delegated by mailbox model, 
so we can avoid concurrent issue even if the flusher timeout valid for batch 
jobs.
* Breaks the previous assumption to allow concurrent access of  
`BoundedBlockingSubpartition#flushCurrentBuffer`.

If we can realize the second option soon, then we can bypass this bug. I 
remembered [~pnowojski]already submitted the PR for it before, but have not 
merged yet. If this way can not be realized in short time, then i prefer the 
first option to work around. WDYT?

> BoundedBlockingSubpartition does not work with StreamTask
> -
>
> Key: FLINK-18832
> URL: https://issues.apache.org/jira/browse/FLINK-18832
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Runtime / Task
>Affects Versions: 1.10.1, 1.12.0, 1.11.1
>Reporter: Till Rohrmann
>Priority: Major
>
> The {{BoundedBlockingSubpartition}} does not work with a {{StreamTask}} 
> because the {{StreamTask}} instantiates an {{OutputFlusher}} which 
> concurrently accesses the {{BoundedBlockingSubpartition}}. This concurrency 
> can lead to a double closing of the underlying {{BufferConsumer}} which 
> manifests in this stack trace:
> {code}
> [9:15 PM] Caused by: 
> org.apache.flink.shaded.netty4.io.netty.util.IllegalReferenceCountException: 
> refCnt: 0, increment: 1
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain0(ReferenceCountUpdater.java:123)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain(ReferenceCountUpdater.java:110)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.retain(AbstractReferenceCountedByteBuf.java:80)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:174)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:47)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:127)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:41)
>   at 
> org.apache.flink.runtime.io.network.buffer.BufferConsumer.build(BufferConsumer.java:108)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.writeAndCloseBufferConsumer(BoundedBlockingSubpartition.java:156)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flushCurrentBuffer(BoundedBlockingSubpartition.java:144)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flush(BoundedBlockingSubpartition.java:135)
>   at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.flushAll(ResultPartition.java:245)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.flushAll(RecordWriter.java:183)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.flush(RecordWriterOutput.java:156)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.flushOutputs(OperatorChain.java:344)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:602)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18832) BoundedBlockingSubpartition does not work with StreamTask

2020-08-05 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17171516#comment-17171516
 ] 

Till Rohrmann commented on FLINK-18832:
---

cc [~pnowojski] and [~zjwang]

> BoundedBlockingSubpartition does not work with StreamTask
> -
>
> Key: FLINK-18832
> URL: https://issues.apache.org/jira/browse/FLINK-18832
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network, Runtime / Task
>Affects Versions: 1.10.1, 1.12.0, 1.11.1
>Reporter: Till Rohrmann
>Priority: Major
>
> The {{BoundedBlockingSubpartition}} does not work with a {{StreamTask}} 
> because the {{StreamTask}} instantiates an {{OutputFlusher}} which 
> concurrently accesses the {{BoundedBlockingSubpartition}}. This concurrency 
> can lead to a double closing of the underlying {{BufferConsumer}} which 
> manifests in this stack trace:
> {code}
> [9:15 PM] Caused by: 
> org.apache.flink.shaded.netty4.io.netty.util.IllegalReferenceCountException: 
> refCnt: 0, increment: 1
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain0(ReferenceCountUpdater.java:123)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain(ReferenceCountUpdater.java:110)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.retain(AbstractReferenceCountedByteBuf.java:80)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:174)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:47)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:127)
>   at 
> org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:41)
>   at 
> org.apache.flink.runtime.io.network.buffer.BufferConsumer.build(BufferConsumer.java:108)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.writeAndCloseBufferConsumer(BoundedBlockingSubpartition.java:156)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flushCurrentBuffer(BoundedBlockingSubpartition.java:144)
>   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flush(BoundedBlockingSubpartition.java:135)
>   at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.flushAll(ResultPartition.java:245)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.flushAll(RecordWriter.java:183)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.flush(RecordWriterOutput.java:156)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.flushOutputs(OperatorChain.java:344)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:602)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)