[
https://issues.apache.org/jira/browse/FLINK-5326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15744827#comment-15744827
]
ASF GitHub Bot commented on FLINK-5326:
---------------------------------------
GitHub user uce opened a pull request:
https://github.com/apache/flink/pull/2996
[FLINK-5326] [network] Check release flag of parent in reader
In `PipelinedSubpartitionView`, there is a possible race with releasing the
parent subpartition and querying for a buffer in the view.
The parent partition release clears all buffers in locked scope and
releases the view outside of the lock. If concurrently the view is queried for
a buffer it might get `null`, which is only allowed if the view was released.
Because the release is only forwarded out of the lock scope, this can
happen before the release has propagated.
As a solution, we check the parent release status as well in the view. This
is how it is handled in the spilled views, too.
This surfaced with the recent refactorings, because the previous
consumption model required multiple rounds of `get, registerListener,
isReleased` calls, which hid this problem. The added parent isReleased call
does not affect normal operation as it is only checked when the returned buffer
is null, which only happens when the partition is consumed or released.
---
This needs to be applied to `release-1.1`, too.
cc @StephanEwen
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/uce/flink 5326-illegal_state
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/2996.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2996
----
commit d965d5abdc389e9b65fd35a69bb16bfb71008504
Author: Ufuk Celebi <[email protected]>
Date: 2016-12-13T10:26:47Z
[FLINK-5326] [network] Check release flag of parent in reader
In PipelinedSubpartitionView, there is a possible race with
releasing the parent subpartition and querying for a buffer
in the view.
The parent partition release clears all buffers in locked
scope and releases the view outside of the lock. If concurrently
the view is queried for a buffer it might get null, which
is only allowed if the view was released.
Because the release is only forwarded out of the lock scope,
this can happen before the release has propagated.
As a solution, we check the parent release status as well in the
view. This is how it is handled in the spilled views, too.
This surfaced with the recent refactorings, because the previous
consumption model required multiple rounds of get, registerListener,
isReleased calls, which hid this problem.
----
> IllegalStateException: Bug in Netty consumer logic: reader queue got notified
> by partition about available data, but none was available
> ----------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-5326
> URL: https://issues.apache.org/jira/browse/FLINK-5326
> Project: Flink
> Issue Type: Bug
> Components: Network
> Affects Versions: 1.2.0, 1.1.4
> Reporter: Robert Metzger
> Assignee: Ufuk Celebi
> Labels: qa
>
> {code}
> 2016-12-10 23:56:39,056 DEBUG
> org.apache.flink.runtime.io.network.partition.ResultPartition - Source:
> control events generator (1/40) (3360ced43a57fed83904f22e93281ce0): Releasing
> ResultPartition
> e585300594a68036b0983cefaf048e17@3360ced43a57fed83904f22e93281ce0 [PIPELINED,
> 1 subpartitions, 0 pending references].
> 2016-12-10 23:56:39,056 INFO org.apache.flink.runtime.taskmanager.Task
> - dynamic filter (1/40) (b1b7284e0b4a6ba08a16c50dcf13ff0d)
> switched from RUNNING to FAILED.
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Fatal error at remote task manager
> 'permanent-qa-cluster-2wv1/10.240.0.27:45062'.
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:229)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:164)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
> at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalStateException: Bug in Netty consumer logic:
> reader queue got notified by partition about available data, but none was
> available.
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:177)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:111)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:308)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:294)
> at
> io.netty.channel.DefaultChannelPipeline.fireUserEventTriggered(DefaultChannelPipeline.java:841)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue$1.run(PartitionRequestQueue.java:84)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
> ... 2 more
> 2016-12-10 23:56:39,073 INFO org.apache.flink.runtime.taskmanager.Task
> - Freeing task resources for dynamic filter (1/40)
> (b1b7284e0b4a6ba08a16c50dcf13ff0d).
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)