Repository: flink Updated Branches: refs/heads/master 684defbf3 -> 4410c04a6
[FLINK-5274] [network] Handle reader release in LocalInputChannel Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/555a6879 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/555a6879 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/555a6879 Branch: refs/heads/master Commit: 555a68793ecb3a244dfbf50a615c6d5e15c9efe4 Parents: 684defb Author: Ufuk Celebi <u...@apache.org> Authored: Wed Dec 7 13:39:18 2016 +0100 Committer: Ufuk Celebi <u...@apache.org> Committed: Wed Dec 7 17:13:59 2016 +0100 ---------------------------------------------------------------------- .../partition/consumer/LocalInputChannel.java | 13 ++++++ .../consumer/LocalInputChannelTest.java | 47 ++++++++++++++++++++ 2 files changed, 60 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/555a6879/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java index c9e0179..4e14e93 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.runtime.event.TaskEvent; +import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; @@ -179,6 +180,18 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit } Buffer next = subpartitionView.getNextBuffer(); + + if (next == null) { + if (subpartitionView.isReleased()) { + throw new CancelTaskException("Consumed partition " + subpartitionView + " has been released."); + } else { + // This means there is a bug in the buffer availability + // notifications. + throw new IllegalStateException("Consumed partition has no buffers available. " + + "Number of received buffer notifications is " + numBuffersAvailable + "."); + } + } + long remaining = numBuffersAvailable.decrementAndGet(); if (remaining >= 0) { http://git-wip-us.apache.org/repos/asf/flink/blob/555a6879/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index 83da3b1..35ed4c3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -347,6 +347,53 @@ public class LocalInputChannelTest { requester.join(); } + /** + * Tests that reading from a channel when after the partition has been + * released are handled and don't lead to NPEs. + */ + @Test + public void testGetNextAfterPartitionReleased() throws Exception { + ResultSubpartitionView reader = mock(ResultSubpartitionView.class); + SingleInputGate gate = mock(SingleInputGate.class); + ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); + + when(partitionManager.createSubpartitionView( + any(ResultPartitionID.class), + anyInt(), + any(BufferProvider.class), + any(BufferAvailabilityListener.class))).thenReturn(reader); + + LocalInputChannel channel = new LocalInputChannel( + gate, + 0, + new ResultPartitionID(), + partitionManager, + new TaskEventDispatcher(), + new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()); + + channel.requestSubpartition(0); + + // Null buffer but not released + when(reader.getNextBuffer()).thenReturn(null); + when(reader.isReleased()).thenReturn(false); + + try { + channel.getNextBuffer(); + fail("Did not throw expected IllegalStateException"); + } catch (IllegalStateException ignored) { + } + + // Null buffer and released + when(reader.getNextBuffer()).thenReturn(null); + when(reader.isReleased()).thenReturn(true); + + try { + channel.getNextBuffer(); + fail("Did not throw expected CancelTaskException"); + } catch (CancelTaskException ignored) { + } + } + // --------------------------------------------------------------------------------------------- private LocalInputChannel createLocalInputChannel(