Repository: flink
Updated Branches:
  refs/heads/master 684defbf3 -> 4410c04a6


[FLINK-5274] [network] Handle reader release in LocalInputChannel


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/555a6879
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/555a6879
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/555a6879

Branch: refs/heads/master
Commit: 555a68793ecb3a244dfbf50a615c6d5e15c9efe4
Parents: 684defb
Author: Ufuk Celebi <u...@apache.org>
Authored: Wed Dec 7 13:39:18 2016 +0100
Committer: Ufuk Celebi <u...@apache.org>
Committed: Wed Dec 7 17:13:59 2016 +0100

----------------------------------------------------------------------
 .../partition/consumer/LocalInputChannel.java   | 13 ++++++
 .../consumer/LocalInputChannelTest.java         | 47 ++++++++++++++++++++
 2 files changed, 60 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/555a6879/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index c9e0179..4e14e93 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
@@ -179,6 +180,18 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
                }
 
                Buffer next = subpartitionView.getNextBuffer();
+
+               if (next == null) {
+                       if (subpartitionView.isReleased()) {
+                               throw new CancelTaskException("Consumed 
partition " + subpartitionView + " has been released.");
+                       } else {
+                               // This means there is a bug in the buffer 
availability
+                               // notifications.
+                               throw new IllegalStateException("Consumed 
partition has no buffers available. " +
+                                       "Number of received buffer 
notifications is " + numBuffersAvailable + ".");
+                       }
+               }
+
                long remaining = numBuffersAvailable.decrementAndGet();
 
                if (remaining >= 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/555a6879/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 83da3b1..35ed4c3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -347,6 +347,53 @@ public class LocalInputChannelTest {
                requester.join();
        }
 
+       /**
+        * Tests that reading from a channel when after the partition has been
+        * released are handled and don't lead to NPEs.
+        */
+       @Test
+       public void testGetNextAfterPartitionReleased() throws Exception {
+               ResultSubpartitionView reader = 
mock(ResultSubpartitionView.class);
+               SingleInputGate gate = mock(SingleInputGate.class);
+               ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
+
+               when(partitionManager.createSubpartitionView(
+                               any(ResultPartitionID.class),
+                               anyInt(),
+                               any(BufferProvider.class),
+                               
any(BufferAvailabilityListener.class))).thenReturn(reader);
+
+               LocalInputChannel channel = new LocalInputChannel(
+                       gate,
+                       0,
+                       new ResultPartitionID(),
+                       partitionManager,
+                       new TaskEventDispatcher(),
+                       new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+
+               channel.requestSubpartition(0);
+
+               // Null buffer but not released
+               when(reader.getNextBuffer()).thenReturn(null);
+               when(reader.isReleased()).thenReturn(false);
+
+               try {
+                       channel.getNextBuffer();
+                       fail("Did not throw expected IllegalStateException");
+               } catch (IllegalStateException ignored) {
+               }
+
+               // Null buffer and released
+               when(reader.getNextBuffer()).thenReturn(null);
+               when(reader.isReleased()).thenReturn(true);
+
+               try {
+                       channel.getNextBuffer();
+                       fail("Did not throw expected CancelTaskException");
+               } catch (CancelTaskException ignored) {
+               }
+       }
+
        // 
---------------------------------------------------------------------------------------------
 
        private LocalInputChannel createLocalInputChannel(

Reply via email to