Repository: flink Updated Branches: refs/heads/master 58b9a3772 -> d433ba9f0
[FLINK-2177] [runtime] Fix possible NPE when closing Netty channel, before it is active Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d433ba9f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d433ba9f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d433ba9f Branch: refs/heads/master Commit: d433ba9f032e5361ae894562b7a8be13cd3efe13 Parents: 58b9a37 Author: Ufuk Celebi <u...@apache.org> Authored: Mon Jun 8 07:14:07 2015 +0200 Committer: Ufuk Celebi <u...@apache.org> Committed: Mon Jun 8 07:14:07 2015 +0200 ---------------------------------------------------------------------- .../netty/PartitionRequestClientHandler.java | 6 +++++- .../netty/PartitionRequestClientHandlerTest.java | 16 ++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d433ba9f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java index 508cac9..51b436b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java @@ -66,7 +66,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter { */ private final ConcurrentMap<InputChannelID, InputChannelID> cancelled = Maps.newConcurrentMap(); - private ChannelHandlerContext ctx; + private volatile ChannelHandlerContext ctx; // ------------------------------------------------------------------------ // Input channel/receiver registration @@ -85,6 +85,10 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter { } void cancelRequestFor(InputChannelID inputChannelId) { + if (inputChannelId == null || ctx == null) { + return; + } + if (cancelled.putIfAbsent(inputChannelId, inputChannelId) == null) { ctx.writeAndFlush(new NettyMessage.CancelPartitionRequest(inputChannelId)); } http://git-wip-us.apache.org/repos/asf/flink/blob/d433ba9f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java index b8e9f25..2c08cc5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java @@ -142,6 +142,22 @@ public class PartitionRequestClientHandlerTest { verify(inputChannel, times(1)).onFailedPartitionRequest(); } + @Test + public void testCancelBeforeActive() throws Exception { + + final RemoteInputChannel inputChannel = mock(RemoteInputChannel.class); + when(inputChannel.getInputChannelId()).thenReturn(new InputChannelID()); + + final PartitionRequestClientHandler client = new PartitionRequestClientHandler(); + client.addInputChannel(inputChannel); + + // Don't throw NPE + client.cancelRequestFor(null); + + // Don't throw NPE, because channel is not active yet + client.cancelRequestFor(inputChannel.getInputChannelId()); + } + // --------------------------------------------------------------------------------------------- /**