This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 72d522efeef956cadeb8fe53778985855f8ee738
Author: Nico Kruber <n...@data-artisans.com>
AuthorDate: Tue Sep 18 12:10:21 2018 +0200

    [hotfix][network][tests] split PipelinedSubpartitionTest for better 
initialization
    
    - add PipelinedSubpartitionWithReadViewTest which always creates a 
subpartition,
    an availability listener, and a read view before each test and cleans up 
after
    each test
    - remove mockito use from testBasicPipelinedProduceConsumeLogic()
---
 .../partition/PipelinedSubpartitionTest.java       | 314 +--------------------
 .../PipelinedSubpartitionWithReadViewTest.java     | 276 ++++++++++++++++++
 .../io/network/partition/SubpartitionTestBase.java |   2 +-
 3 files changed, 292 insertions(+), 300 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index b75bb7a..82f61ab 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -40,19 +40,12 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
-import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createEventBufferConsumer;
-import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferBuilder;
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
-import static 
org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_SIZE;
 import static org.apache.flink.util.FutureUtil.waitForAll;
 import static org.apache.flink.util.Preconditions.checkState;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
@@ -62,6 +55,8 @@ import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link PipelinedSubpartition}.
+ *
+ * @see PipelinedSubpartitionWithReadViewTest
  */
 public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 
@@ -80,189 +75,6 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                return new PipelinedSubpartition(0, parent);
        }
 
-       @Test(expected = IllegalStateException.class)
-       public void testAddTwoNonFinishedBuffer() throws Exception {
-               final ResultSubpartition subpartition = createSubpartition();
-               AwaitableBufferAvailablityListener availablityListener = new 
AwaitableBufferAvailablityListener();
-               ResultSubpartitionView readView = 
subpartition.createReadView(availablityListener);
-               availablityListener.resetNotificationCounters();
-
-               try {
-                       
subpartition.add(createBufferBuilder().createBufferConsumer());
-                       
subpartition.add(createBufferBuilder().createBufferConsumer());
-                       assertNull(readView.getNextBuffer());
-               } finally {
-                       subpartition.release();
-               }
-       }
-
-       @Test
-       public void testAddEmptyNonFinishedBuffer() throws Exception {
-               final ResultSubpartition subpartition = createSubpartition();
-               AwaitableBufferAvailablityListener availablityListener = new 
AwaitableBufferAvailablityListener();
-               ResultSubpartitionView readView = 
subpartition.createReadView(availablityListener);
-               availablityListener.resetNotificationCounters();
-
-               try {
-                       assertEquals(0, 
availablityListener.getNumNotifications());
-
-                       BufferBuilder bufferBuilder = createBufferBuilder();
-                       subpartition.add(bufferBuilder.createBufferConsumer());
-
-                       assertEquals(0, 
availablityListener.getNumNotifications());
-                       assertNull(readView.getNextBuffer());
-
-                       bufferBuilder.finish();
-                       bufferBuilder = createBufferBuilder();
-                       subpartition.add(bufferBuilder.createBufferConsumer());
-
-                       assertEquals(1, 
availablityListener.getNumNotifications()); // notification from finishing 
previous buffer.
-                       assertNull(readView.getNextBuffer());
-                       assertEquals(1, subpartition.getBuffersInBacklog());
-               } finally {
-                       readView.releaseAllResources();
-                       subpartition.release();
-               }
-       }
-
-       @Test
-       public void testAddNonEmptyNotFinishedBuffer() throws Exception {
-               final ResultSubpartition subpartition = createSubpartition();
-               AwaitableBufferAvailablityListener availablityListener = new 
AwaitableBufferAvailablityListener();
-               ResultSubpartitionView readView = 
subpartition.createReadView(availablityListener);
-               availablityListener.resetNotificationCounters();
-
-               try {
-                       assertEquals(0, 
availablityListener.getNumNotifications());
-
-                       BufferBuilder bufferBuilder = createBufferBuilder();
-                       
bufferBuilder.appendAndCommit(ByteBuffer.allocate(1024));
-                       subpartition.add(bufferBuilder.createBufferConsumer());
-
-                       // note that since the buffer builder is not finished, 
there is still a retained instance!
-                       assertNextBuffer(readView, 1024, false, 1, false, 
false);
-                       assertEquals(1, subpartition.getBuffersInBacklog());
-               } finally {
-                       readView.releaseAllResources();
-                       subpartition.release();
-               }
-       }
-
-       /**
-        * Normally moreAvailable flag from InputChannel should ignore non 
finished BufferConsumers, otherwise we would
-        * busy loop on the unfinished BufferConsumers.
-        */
-       @Test
-       public void testUnfinishedBufferBehindFinished() throws Exception {
-               final ResultSubpartition subpartition = createSubpartition();
-               AwaitableBufferAvailablityListener availablityListener = new 
AwaitableBufferAvailablityListener();
-               ResultSubpartitionView readView = 
subpartition.createReadView(availablityListener);
-
-               try {
-                       subpartition.add(createFilledBufferConsumer(1025)); // 
finished
-                       
subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // 
not finished
-
-                       assertThat(availablityListener.getNumNotifications(), 
greaterThan(0L));
-                       assertNextBuffer(readView, 1025, false, 1, false, true);
-                       // not notified, but we could still access the 
unfinished buffer
-                       assertNextBuffer(readView, 1024, false, 1, false, 
false);
-                       assertNoNextBuffer(readView);
-               } finally {
-                       subpartition.release();
-               }
-       }
-
-       /**
-        * After flush call unfinished BufferConsumers should be reported as 
available, otherwise we might not flush some
-        * of the data.
-        */
-       @Test
-       public void testFlushWithUnfinishedBufferBehindFinished() throws 
Exception {
-               final ResultSubpartition subpartition = createSubpartition();
-               AwaitableBufferAvailablityListener availablityListener = new 
AwaitableBufferAvailablityListener();
-               ResultSubpartitionView readView = 
subpartition.createReadView(availablityListener);
-
-               try {
-                       subpartition.add(createFilledBufferConsumer(1025)); // 
finished
-                       
subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // 
not finished
-                       long oldNumNotifications = 
availablityListener.getNumNotifications();
-                       subpartition.flush();
-                       // buffer queue is > 1, should already be notified, no 
further notification necessary
-                       assertThat(oldNumNotifications, greaterThan(0L));
-                       assertEquals(oldNumNotifications, 
availablityListener.getNumNotifications());
-
-                       assertNextBuffer(readView, 1025, true, 1, false, true);
-                       assertNextBuffer(readView, 1024, false, 1, false, 
false);
-                       assertNoNextBuffer(readView);
-               } finally {
-                       subpartition.release();
-               }
-       }
-
-       /**
-        * A flush call with a buffer size of 1 should always notify consumers 
(unless already flushed).
-        */
-       @Test
-       public void testFlushWithUnfinishedBufferBehindFinished2() throws 
Exception {
-               final ResultSubpartition subpartition = createSubpartition();
-               AwaitableBufferAvailablityListener availablityListener = new 
AwaitableBufferAvailablityListener();
-               ResultSubpartitionView readView = 
subpartition.createReadView(availablityListener);
-
-               try {
-                       // no buffers -> no notification or any other effects
-                       subpartition.flush();
-                       assertEquals(0, 
availablityListener.getNumNotifications());
-
-                       subpartition.add(createFilledBufferConsumer(1025)); // 
finished
-                       
subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // 
not finished
-
-                       assertNextBuffer(readView, 1025, false, 1, false, true);
-
-                       long oldNumNotifications = 
availablityListener.getNumNotifications();
-                       subpartition.flush();
-                       // buffer queue is 1 again -> need to flush
-                       assertEquals(oldNumNotifications + 1, 
availablityListener.getNumNotifications());
-                       subpartition.flush();
-                       // calling again should not flush again
-                       assertEquals(oldNumNotifications + 1, 
availablityListener.getNumNotifications());
-
-                       assertNextBuffer(readView, 1024, false, 1, false, 
false);
-                       assertNoNextBuffer(readView);
-               } finally {
-                       subpartition.release();
-               }
-       }
-
-       @Test
-       public void testMultipleEmptyBuffers() throws Exception {
-               final ResultSubpartition subpartition = createSubpartition();
-               AwaitableBufferAvailablityListener availablityListener = new 
AwaitableBufferAvailablityListener();
-               ResultSubpartitionView readView = 
subpartition.createReadView(availablityListener);
-               availablityListener.resetNotificationCounters();
-
-               try {
-                       assertEquals(0, 
availablityListener.getNumNotifications());
-
-                       subpartition.add(createFilledBufferConsumer(0));
-
-                       assertEquals(1, 
availablityListener.getNumNotifications());
-                       subpartition.add(createFilledBufferConsumer(0));
-                       assertEquals(2, 
availablityListener.getNumNotifications());
-
-                       subpartition.add(createFilledBufferConsumer(0));
-                       assertEquals(2, 
availablityListener.getNumNotifications());
-                       assertEquals(3, subpartition.getBuffersInBacklog());
-
-                       subpartition.add(createFilledBufferConsumer(1024));
-                       assertEquals(2, 
availablityListener.getNumNotifications());
-
-                       assertNextBuffer(readView, 1024, false, 0, false, true);
-               } finally {
-                       readView.releaseAllResources();
-                       subpartition.release();
-               }
-       }
-
        @Test
        public void testIllegalReadViewRequest() throws Exception {
                final PipelinedSubpartition subpartition = createSubpartition();
@@ -278,100 +90,23 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                }
        }
 
+       /**
+        * Verifies that the isReleased() check of the view checks the parent
+        * subpartition.
+        */
        @Test
-       public void testEmptyFlush() throws Exception {
-               final PipelinedSubpartition subpartition = createSubpartition();
+       public void testIsReleasedChecksParent() {
+               PipelinedSubpartition subpartition = 
mock(PipelinedSubpartition.class);
 
-               AwaitableBufferAvailablityListener listener = new 
AwaitableBufferAvailablityListener();
-               subpartition.createReadView(listener);
-               subpartition.flush();
-               assertEquals(0, listener.getNumNotifications());
-       }
+               PipelinedSubpartitionView reader = new 
PipelinedSubpartitionView(
+                       subpartition, mock(BufferAvailabilityListener.class));
 
-       @Test
-       public void testBasicPipelinedProduceConsumeLogic() throws Exception {
-               final PipelinedSubpartition subpartition = createSubpartition();
+               assertFalse(reader.isReleased());
+               verify(subpartition, times(1)).isReleased();
 
-               BufferAvailabilityListener listener = 
mock(BufferAvailabilityListener.class);
-
-               ResultSubpartitionView view = 
subpartition.createReadView(listener);
-
-               // Empty => should return null
-               assertFalse(view.nextBufferIsEvent());
-               assertNoNextBuffer(view);
-               assertFalse(view.nextBufferIsEvent()); // also after 
getNextBuffer()
-               verify(listener, times(0)).notifyDataAvailable();
-
-               // Add data to the queue...
-               subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
-               assertFalse(view.nextBufferIsEvent());
-
-               assertEquals(1, subpartition.getTotalNumberOfBuffers());
-               assertEquals(1, subpartition.getBuffersInBacklog());
-               assertEquals(0, subpartition.getTotalNumberOfBytes()); // only 
updated when getting the buffer
-
-               // ...should have resulted in a notification
-               verify(listener, times(1)).notifyDataAvailable();
-
-               // ...and one available result
-               assertNextBuffer(view, BUFFER_SIZE, false, 
subpartition.getBuffersInBacklog() - 1, false, true);
-               assertEquals(BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-               assertEquals(0, subpartition.getBuffersInBacklog());
-               assertNoNextBuffer(view);
-               assertEquals(0, subpartition.getBuffersInBacklog());
-
-               // Add data to the queue...
-               subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
-               assertFalse(view.nextBufferIsEvent());
-
-               assertEquals(2, subpartition.getTotalNumberOfBuffers());
-               assertEquals(1, subpartition.getBuffersInBacklog());
-               assertEquals(BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-               verify(listener, times(2)).notifyDataAvailable();
-
-               assertNextBuffer(view, BUFFER_SIZE, false, 
subpartition.getBuffersInBacklog() - 1, false, true);
-               assertEquals(2 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-               assertEquals(0, subpartition.getBuffersInBacklog());
-               assertNoNextBuffer(view);
-               assertEquals(0, subpartition.getBuffersInBacklog());
-
-               // some tests with events
-
-               // fill with: buffer, event, and buffer
-               subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
-               assertFalse(view.nextBufferIsEvent());
-               subpartition.add(createEventBufferConsumer(BUFFER_SIZE));
-               assertFalse(view.nextBufferIsEvent());
-               subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
-               assertFalse(view.nextBufferIsEvent());
-
-               assertEquals(5, subpartition.getTotalNumberOfBuffers());
-               assertEquals(2, subpartition.getBuffersInBacklog()); // two 
buffers (events don't count)
-               assertEquals(2 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-               verify(listener, times(4)).notifyDataAvailable();
-
-               // the first buffer
-               assertNextBuffer(view, BUFFER_SIZE, true, 
subpartition.getBuffersInBacklog() - 1, true, true);
-               assertEquals(3 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-               assertEquals(1, subpartition.getBuffersInBacklog());
-
-               // the event
-               assertNextEvent(view, BUFFER_SIZE, null, true, 
subpartition.getBuffersInBacklog(), false, true);
-               assertEquals(4 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-               assertEquals(1, subpartition.getBuffersInBacklog());
-
-               // the remaining buffer
-               assertNextBuffer(view, BUFFER_SIZE, false, 
subpartition.getBuffersInBacklog() - 1, false, true);
-               assertEquals(5 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-               assertEquals(0, subpartition.getBuffersInBacklog());
-
-               // nothing more
-               assertNoNextBuffer(view);
-               assertEquals(0, subpartition.getBuffersInBacklog());
-
-               assertEquals(5, subpartition.getTotalNumberOfBuffers());
-               assertEquals(5 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes());
-               verify(listener, times(4)).notifyDataAvailable();
+               when(subpartition.isReleased()).thenReturn(true);
+               assertTrue(reader.isReleased());
+               verify(subpartition, times(2)).isReleased();
        }
 
        @Test
@@ -394,25 +129,6 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                testProduceConsume(true, true);
        }
 
-       /**
-        * Verifies that the isReleased() check of the view checks the parent
-        * subpartition.
-        */
-       @Test
-       public void testIsReleasedChecksParent() throws Exception {
-               PipelinedSubpartition subpartition = 
mock(PipelinedSubpartition.class);
-
-               PipelinedSubpartitionView reader = new 
PipelinedSubpartitionView(
-                               subpartition, 
mock(BufferAvailabilityListener.class));
-
-               assertFalse(reader.isReleased());
-               verify(subpartition, times(1)).isReleased();
-
-               when(subpartition.isReleased()).thenReturn(true);
-               assertTrue(reader.isReleased());
-               verify(subpartition, times(2)).isReleased();
-       }
-
        private void testProduceConsume(boolean isSlowProducer, boolean 
isSlowConsumer) throws Exception {
                // Config
                final int producerBufferPoolSize = 8;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
new file mode 100644
index 0000000..6f9920e
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
+import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createEventBufferConsumer;
+import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferBuilder;
+import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
+import static 
org.apache.flink.runtime.io.network.partition.SubpartitionTestBase.assertNextBuffer;
+import static 
org.apache.flink.runtime.io.network.partition.SubpartitionTestBase.assertNextEvent;
+import static 
org.apache.flink.runtime.io.network.partition.SubpartitionTestBase.assertNoNextBuffer;
+import static 
org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_SIZE;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Additional tests for {@link PipelinedSubpartition} which require an 
availability listener and a
+ * read view.
+ *
+ * @see PipelinedSubpartitionTest
+ */
+public class PipelinedSubpartitionWithReadViewTest {
+
+       private PipelinedSubpartition subpartition;
+       private AwaitableBufferAvailablityListener availablityListener;
+       private PipelinedSubpartitionView readView;
+
+       @Before
+       public void setup() throws IOException {
+               final ResultPartition parent = mock(ResultPartition.class);
+               subpartition = new PipelinedSubpartition(0, parent);
+               availablityListener = new AwaitableBufferAvailablityListener();
+               readView = subpartition.createReadView(availablityListener);
+       }
+
+       @After
+       public void tearDown() {
+               readView.releaseAllResources();
+               subpartition.release();
+       }
+
+       @Test(expected = IllegalStateException.class)
+       public void testAddTwoNonFinishedBuffer() {
+               subpartition.add(createBufferBuilder().createBufferConsumer());
+               subpartition.add(createBufferBuilder().createBufferConsumer());
+               assertNull(readView.getNextBuffer());
+       }
+
+       @Test
+       public void testAddEmptyNonFinishedBuffer() {
+               assertEquals(0, availablityListener.getNumNotifications());
+
+               BufferBuilder bufferBuilder = createBufferBuilder();
+               subpartition.add(bufferBuilder.createBufferConsumer());
+
+               assertEquals(0, availablityListener.getNumNotifications());
+               assertNull(readView.getNextBuffer());
+
+               bufferBuilder.finish();
+               bufferBuilder = createBufferBuilder();
+               subpartition.add(bufferBuilder.createBufferConsumer());
+
+               assertEquals(1, availablityListener.getNumNotifications()); // 
notification from finishing previous buffer.
+               assertNull(readView.getNextBuffer());
+               assertEquals(1, subpartition.getBuffersInBacklog());
+       }
+
+       @Test
+       public void testAddNonEmptyNotFinishedBuffer() throws Exception {
+               assertEquals(0, availablityListener.getNumNotifications());
+
+               BufferBuilder bufferBuilder = createBufferBuilder();
+               bufferBuilder.appendAndCommit(ByteBuffer.allocate(1024));
+               subpartition.add(bufferBuilder.createBufferConsumer());
+
+               // note that since the buffer builder is not finished, there is 
still a retained instance!
+               assertNextBuffer(readView, 1024, false, 1, false, false);
+               assertEquals(1, subpartition.getBuffersInBacklog());
+       }
+
+       /**
+        * Normally moreAvailable flag from InputChannel should ignore non 
finished BufferConsumers, otherwise we would
+        * busy loop on the unfinished BufferConsumers.
+        */
+       @Test
+       public void testUnfinishedBufferBehindFinished() throws Exception {
+               subpartition.add(createFilledBufferConsumer(1025)); // finished
+               
subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // 
not finished
+
+               assertThat(availablityListener.getNumNotifications(), 
greaterThan(0L));
+               assertNextBuffer(readView, 1025, false, 1, false, true);
+               // not notified, but we could still access the unfinished buffer
+               assertNextBuffer(readView, 1024, false, 1, false, false);
+               assertNoNextBuffer(readView);
+       }
+
+       /**
+        * After flush call unfinished BufferConsumers should be reported as 
available, otherwise we might not flush some
+        * of the data.
+        */
+       @Test
+       public void testFlushWithUnfinishedBufferBehindFinished() throws 
Exception {
+               subpartition.add(createFilledBufferConsumer(1025)); // finished
+               
subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // 
not finished
+               long oldNumNotifications = 
availablityListener.getNumNotifications();
+               subpartition.flush();
+               // buffer queue is > 1, should already be notified, no further 
notification necessary
+               assertThat(oldNumNotifications, greaterThan(0L));
+               assertEquals(oldNumNotifications, 
availablityListener.getNumNotifications());
+
+               assertNextBuffer(readView, 1025, true, 1, false, true);
+               assertNextBuffer(readView, 1024, false, 1, false, false);
+               assertNoNextBuffer(readView);
+       }
+
+       /**
+        * A flush call with a buffer size of 1 should always notify consumers 
(unless already flushed).
+        */
+       @Test
+       public void testFlushWithUnfinishedBufferBehindFinished2() throws 
Exception {
+               // no buffers -> no notification or any other effects
+               subpartition.flush();
+               assertEquals(0, availablityListener.getNumNotifications());
+
+               subpartition.add(createFilledBufferConsumer(1025)); // finished
+               
subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // 
not finished
+
+               assertNextBuffer(readView, 1025, false, 1, false, true);
+
+               long oldNumNotifications = 
availablityListener.getNumNotifications();
+               subpartition.flush();
+               // buffer queue is 1 again -> need to flush
+               assertEquals(oldNumNotifications + 1, 
availablityListener.getNumNotifications());
+               subpartition.flush();
+               // calling again should not flush again
+               assertEquals(oldNumNotifications + 1, 
availablityListener.getNumNotifications());
+
+               assertNextBuffer(readView, 1024, false, 1, false, false);
+               assertNoNextBuffer(readView);
+       }
+
+       @Test
+       public void testMultipleEmptyBuffers() throws Exception {
+               assertEquals(0, availablityListener.getNumNotifications());
+
+               subpartition.add(createFilledBufferConsumer(0));
+
+               assertEquals(1, availablityListener.getNumNotifications());
+               subpartition.add(createFilledBufferConsumer(0));
+               assertEquals(2, availablityListener.getNumNotifications());
+
+               subpartition.add(createFilledBufferConsumer(0));
+               assertEquals(2, availablityListener.getNumNotifications());
+               assertEquals(3, subpartition.getBuffersInBacklog());
+
+               subpartition.add(createFilledBufferConsumer(1024));
+               assertEquals(2, availablityListener.getNumNotifications());
+
+               assertNextBuffer(readView, 1024, false, 0, false, true);
+       }
+
+       @Test
+       public void testEmptyFlush()  {
+               subpartition.flush();
+               assertEquals(0, availablityListener.getNumNotifications());
+       }
+
+       @Test
+       public void testBasicPipelinedProduceConsumeLogic() throws Exception {
+               // Empty => should return null
+               assertFalse(readView.nextBufferIsEvent());
+               assertNoNextBuffer(readView);
+               assertFalse(readView.nextBufferIsEvent()); // also after 
getNextBuffer()
+               assertEquals(0, availablityListener.getNumNotifications());
+
+               // Add data to the queue...
+               subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
+               assertFalse(readView.nextBufferIsEvent());
+
+               assertEquals(1, subpartition.getTotalNumberOfBuffers());
+               assertEquals(1, subpartition.getBuffersInBacklog());
+               assertEquals(0, subpartition.getTotalNumberOfBytes()); // only 
updated when getting the buffer
+
+               // ...should have resulted in a notification
+               assertEquals(1, availablityListener.getNumNotifications());
+
+               // ...and one available result
+               assertNextBuffer(readView, BUFFER_SIZE, false, 
subpartition.getBuffersInBacklog() - 1, false, true);
+               assertEquals(BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+               assertEquals(0, subpartition.getBuffersInBacklog());
+               assertNoNextBuffer(readView);
+               assertEquals(0, subpartition.getBuffersInBacklog());
+
+               // Add data to the queue...
+               subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
+               assertFalse(readView.nextBufferIsEvent());
+
+               assertEquals(2, subpartition.getTotalNumberOfBuffers());
+               assertEquals(1, subpartition.getBuffersInBacklog());
+               assertEquals(BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+               assertEquals(2, availablityListener.getNumNotifications());
+
+               assertNextBuffer(readView, BUFFER_SIZE, false, 
subpartition.getBuffersInBacklog() - 1, false, true);
+               assertEquals(2 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+               assertEquals(0, subpartition.getBuffersInBacklog());
+               assertNoNextBuffer(readView);
+               assertEquals(0, subpartition.getBuffersInBacklog());
+
+               // some tests with events
+
+               // fill with: buffer, event, and buffer
+               subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
+               assertFalse(readView.nextBufferIsEvent());
+               subpartition.add(createEventBufferConsumer(BUFFER_SIZE));
+               assertFalse(readView.nextBufferIsEvent());
+               subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
+               assertFalse(readView.nextBufferIsEvent());
+
+               assertEquals(5, subpartition.getTotalNumberOfBuffers());
+               assertEquals(2, subpartition.getBuffersInBacklog()); // two 
buffers (events don't count)
+               assertEquals(2 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+               assertEquals(4, availablityListener.getNumNotifications());
+
+               // the first buffer
+               assertNextBuffer(readView, BUFFER_SIZE, true, 
subpartition.getBuffersInBacklog() - 1, true, true);
+               assertEquals(3 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+               assertEquals(1, subpartition.getBuffersInBacklog());
+
+               // the event
+               assertNextEvent(readView, BUFFER_SIZE, null, true, 
subpartition.getBuffersInBacklog(), false, true);
+               assertEquals(4 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+               assertEquals(1, subpartition.getBuffersInBacklog());
+
+               // the remaining buffer
+               assertNextBuffer(readView, BUFFER_SIZE, false, 
subpartition.getBuffersInBacklog() - 1, false, true);
+               assertEquals(5 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+               assertEquals(0, subpartition.getBuffersInBacklog());
+
+               // nothing more
+               assertNoNextBuffer(readView);
+               assertEquals(0, subpartition.getBuffersInBacklog());
+
+               assertEquals(5, subpartition.getTotalNumberOfBuffers());
+               assertEquals(5 * BUFFER_SIZE, 
subpartition.getTotalNumberOfBytes());
+               assertEquals(4, availablityListener.getNumNotifications());
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index 5989cf8..9f5e6d0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -217,7 +217,7 @@ public abstract class SubpartitionTestBase extends 
TestLogger {
                assertEquals("recycled", expectedRecycledAfterRecycle, 
bufferAndBacklog.buffer().isRecycled());
        }
 
-       protected void assertNoNextBuffer(ResultSubpartitionView readView) 
throws IOException, InterruptedException {
+       static void assertNoNextBuffer(ResultSubpartitionView readView) throws 
IOException, InterruptedException {
                assertNull(readView.getNextBuffer());
        }
 }

Reply via email to