This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 72d522efeef956cadeb8fe53778985855f8ee738 Author: Nico Kruber <n...@data-artisans.com> AuthorDate: Tue Sep 18 12:10:21 2018 +0200 [hotfix][network][tests] split PipelinedSubpartitionTest for better initialization - add PipelinedSubpartitionWithReadViewTest which always creates a subpartition, an availability listener, and a read view before each test and cleans up after each test - remove mockito use from testBasicPipelinedProduceConsumeLogic() --- .../partition/PipelinedSubpartitionTest.java | 314 +-------------------- .../PipelinedSubpartitionWithReadViewTest.java | 276 ++++++++++++++++++ .../io/network/partition/SubpartitionTestBase.java | 2 +- 3 files changed, 292 insertions(+), 300 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java index b75bb7a..82f61ab 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java @@ -40,19 +40,12 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder; -import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createEventBufferConsumer; -import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferBuilder; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer; -import static org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_SIZE; import static org.apache.flink.util.FutureUtil.waitForAll; import static org.apache.flink.util.Preconditions.checkState; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; @@ -62,6 +55,8 @@ import static org.mockito.Mockito.when; /** * Tests for {@link PipelinedSubpartition}. + * + * @see PipelinedSubpartitionWithReadViewTest */ public class PipelinedSubpartitionTest extends SubpartitionTestBase { @@ -80,189 +75,6 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { return new PipelinedSubpartition(0, parent); } - @Test(expected = IllegalStateException.class) - public void testAddTwoNonFinishedBuffer() throws Exception { - final ResultSubpartition subpartition = createSubpartition(); - AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener(); - ResultSubpartitionView readView = subpartition.createReadView(availablityListener); - availablityListener.resetNotificationCounters(); - - try { - subpartition.add(createBufferBuilder().createBufferConsumer()); - subpartition.add(createBufferBuilder().createBufferConsumer()); - assertNull(readView.getNextBuffer()); - } finally { - subpartition.release(); - } - } - - @Test - public void testAddEmptyNonFinishedBuffer() throws Exception { - final ResultSubpartition subpartition = createSubpartition(); - AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener(); - ResultSubpartitionView readView = subpartition.createReadView(availablityListener); - availablityListener.resetNotificationCounters(); - - try { - assertEquals(0, availablityListener.getNumNotifications()); - - BufferBuilder bufferBuilder = createBufferBuilder(); - subpartition.add(bufferBuilder.createBufferConsumer()); - - assertEquals(0, availablityListener.getNumNotifications()); - assertNull(readView.getNextBuffer()); - - bufferBuilder.finish(); - bufferBuilder = createBufferBuilder(); - subpartition.add(bufferBuilder.createBufferConsumer()); - - assertEquals(1, availablityListener.getNumNotifications()); // notification from finishing previous buffer. - assertNull(readView.getNextBuffer()); - assertEquals(1, subpartition.getBuffersInBacklog()); - } finally { - readView.releaseAllResources(); - subpartition.release(); - } - } - - @Test - public void testAddNonEmptyNotFinishedBuffer() throws Exception { - final ResultSubpartition subpartition = createSubpartition(); - AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener(); - ResultSubpartitionView readView = subpartition.createReadView(availablityListener); - availablityListener.resetNotificationCounters(); - - try { - assertEquals(0, availablityListener.getNumNotifications()); - - BufferBuilder bufferBuilder = createBufferBuilder(); - bufferBuilder.appendAndCommit(ByteBuffer.allocate(1024)); - subpartition.add(bufferBuilder.createBufferConsumer()); - - // note that since the buffer builder is not finished, there is still a retained instance! - assertNextBuffer(readView, 1024, false, 1, false, false); - assertEquals(1, subpartition.getBuffersInBacklog()); - } finally { - readView.releaseAllResources(); - subpartition.release(); - } - } - - /** - * Normally moreAvailable flag from InputChannel should ignore non finished BufferConsumers, otherwise we would - * busy loop on the unfinished BufferConsumers. - */ - @Test - public void testUnfinishedBufferBehindFinished() throws Exception { - final ResultSubpartition subpartition = createSubpartition(); - AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener(); - ResultSubpartitionView readView = subpartition.createReadView(availablityListener); - - try { - subpartition.add(createFilledBufferConsumer(1025)); // finished - subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished - - assertThat(availablityListener.getNumNotifications(), greaterThan(0L)); - assertNextBuffer(readView, 1025, false, 1, false, true); - // not notified, but we could still access the unfinished buffer - assertNextBuffer(readView, 1024, false, 1, false, false); - assertNoNextBuffer(readView); - } finally { - subpartition.release(); - } - } - - /** - * After flush call unfinished BufferConsumers should be reported as available, otherwise we might not flush some - * of the data. - */ - @Test - public void testFlushWithUnfinishedBufferBehindFinished() throws Exception { - final ResultSubpartition subpartition = createSubpartition(); - AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener(); - ResultSubpartitionView readView = subpartition.createReadView(availablityListener); - - try { - subpartition.add(createFilledBufferConsumer(1025)); // finished - subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished - long oldNumNotifications = availablityListener.getNumNotifications(); - subpartition.flush(); - // buffer queue is > 1, should already be notified, no further notification necessary - assertThat(oldNumNotifications, greaterThan(0L)); - assertEquals(oldNumNotifications, availablityListener.getNumNotifications()); - - assertNextBuffer(readView, 1025, true, 1, false, true); - assertNextBuffer(readView, 1024, false, 1, false, false); - assertNoNextBuffer(readView); - } finally { - subpartition.release(); - } - } - - /** - * A flush call with a buffer size of 1 should always notify consumers (unless already flushed). - */ - @Test - public void testFlushWithUnfinishedBufferBehindFinished2() throws Exception { - final ResultSubpartition subpartition = createSubpartition(); - AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener(); - ResultSubpartitionView readView = subpartition.createReadView(availablityListener); - - try { - // no buffers -> no notification or any other effects - subpartition.flush(); - assertEquals(0, availablityListener.getNumNotifications()); - - subpartition.add(createFilledBufferConsumer(1025)); // finished - subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished - - assertNextBuffer(readView, 1025, false, 1, false, true); - - long oldNumNotifications = availablityListener.getNumNotifications(); - subpartition.flush(); - // buffer queue is 1 again -> need to flush - assertEquals(oldNumNotifications + 1, availablityListener.getNumNotifications()); - subpartition.flush(); - // calling again should not flush again - assertEquals(oldNumNotifications + 1, availablityListener.getNumNotifications()); - - assertNextBuffer(readView, 1024, false, 1, false, false); - assertNoNextBuffer(readView); - } finally { - subpartition.release(); - } - } - - @Test - public void testMultipleEmptyBuffers() throws Exception { - final ResultSubpartition subpartition = createSubpartition(); - AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener(); - ResultSubpartitionView readView = subpartition.createReadView(availablityListener); - availablityListener.resetNotificationCounters(); - - try { - assertEquals(0, availablityListener.getNumNotifications()); - - subpartition.add(createFilledBufferConsumer(0)); - - assertEquals(1, availablityListener.getNumNotifications()); - subpartition.add(createFilledBufferConsumer(0)); - assertEquals(2, availablityListener.getNumNotifications()); - - subpartition.add(createFilledBufferConsumer(0)); - assertEquals(2, availablityListener.getNumNotifications()); - assertEquals(3, subpartition.getBuffersInBacklog()); - - subpartition.add(createFilledBufferConsumer(1024)); - assertEquals(2, availablityListener.getNumNotifications()); - - assertNextBuffer(readView, 1024, false, 0, false, true); - } finally { - readView.releaseAllResources(); - subpartition.release(); - } - } - @Test public void testIllegalReadViewRequest() throws Exception { final PipelinedSubpartition subpartition = createSubpartition(); @@ -278,100 +90,23 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { } } + /** + * Verifies that the isReleased() check of the view checks the parent + * subpartition. + */ @Test - public void testEmptyFlush() throws Exception { - final PipelinedSubpartition subpartition = createSubpartition(); + public void testIsReleasedChecksParent() { + PipelinedSubpartition subpartition = mock(PipelinedSubpartition.class); - AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener(); - subpartition.createReadView(listener); - subpartition.flush(); - assertEquals(0, listener.getNumNotifications()); - } + PipelinedSubpartitionView reader = new PipelinedSubpartitionView( + subpartition, mock(BufferAvailabilityListener.class)); - @Test - public void testBasicPipelinedProduceConsumeLogic() throws Exception { - final PipelinedSubpartition subpartition = createSubpartition(); + assertFalse(reader.isReleased()); + verify(subpartition, times(1)).isReleased(); - BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class); - - ResultSubpartitionView view = subpartition.createReadView(listener); - - // Empty => should return null - assertFalse(view.nextBufferIsEvent()); - assertNoNextBuffer(view); - assertFalse(view.nextBufferIsEvent()); // also after getNextBuffer() - verify(listener, times(0)).notifyDataAvailable(); - - // Add data to the queue... - subpartition.add(createFilledBufferConsumer(BUFFER_SIZE)); - assertFalse(view.nextBufferIsEvent()); - - assertEquals(1, subpartition.getTotalNumberOfBuffers()); - assertEquals(1, subpartition.getBuffersInBacklog()); - assertEquals(0, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer - - // ...should have resulted in a notification - verify(listener, times(1)).notifyDataAvailable(); - - // ...and one available result - assertNextBuffer(view, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true); - assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer - assertEquals(0, subpartition.getBuffersInBacklog()); - assertNoNextBuffer(view); - assertEquals(0, subpartition.getBuffersInBacklog()); - - // Add data to the queue... - subpartition.add(createFilledBufferConsumer(BUFFER_SIZE)); - assertFalse(view.nextBufferIsEvent()); - - assertEquals(2, subpartition.getTotalNumberOfBuffers()); - assertEquals(1, subpartition.getBuffersInBacklog()); - assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer - verify(listener, times(2)).notifyDataAvailable(); - - assertNextBuffer(view, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true); - assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer - assertEquals(0, subpartition.getBuffersInBacklog()); - assertNoNextBuffer(view); - assertEquals(0, subpartition.getBuffersInBacklog()); - - // some tests with events - - // fill with: buffer, event, and buffer - subpartition.add(createFilledBufferConsumer(BUFFER_SIZE)); - assertFalse(view.nextBufferIsEvent()); - subpartition.add(createEventBufferConsumer(BUFFER_SIZE)); - assertFalse(view.nextBufferIsEvent()); - subpartition.add(createFilledBufferConsumer(BUFFER_SIZE)); - assertFalse(view.nextBufferIsEvent()); - - assertEquals(5, subpartition.getTotalNumberOfBuffers()); - assertEquals(2, subpartition.getBuffersInBacklog()); // two buffers (events don't count) - assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer - verify(listener, times(4)).notifyDataAvailable(); - - // the first buffer - assertNextBuffer(view, BUFFER_SIZE, true, subpartition.getBuffersInBacklog() - 1, true, true); - assertEquals(3 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer - assertEquals(1, subpartition.getBuffersInBacklog()); - - // the event - assertNextEvent(view, BUFFER_SIZE, null, true, subpartition.getBuffersInBacklog(), false, true); - assertEquals(4 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer - assertEquals(1, subpartition.getBuffersInBacklog()); - - // the remaining buffer - assertNextBuffer(view, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true); - assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer - assertEquals(0, subpartition.getBuffersInBacklog()); - - // nothing more - assertNoNextBuffer(view); - assertEquals(0, subpartition.getBuffersInBacklog()); - - assertEquals(5, subpartition.getTotalNumberOfBuffers()); - assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); - verify(listener, times(4)).notifyDataAvailable(); + when(subpartition.isReleased()).thenReturn(true); + assertTrue(reader.isReleased()); + verify(subpartition, times(2)).isReleased(); } @Test @@ -394,25 +129,6 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { testProduceConsume(true, true); } - /** - * Verifies that the isReleased() check of the view checks the parent - * subpartition. - */ - @Test - public void testIsReleasedChecksParent() throws Exception { - PipelinedSubpartition subpartition = mock(PipelinedSubpartition.class); - - PipelinedSubpartitionView reader = new PipelinedSubpartitionView( - subpartition, mock(BufferAvailabilityListener.class)); - - assertFalse(reader.isReleased()); - verify(subpartition, times(1)).isReleased(); - - when(subpartition.isReleased()).thenReturn(true); - assertTrue(reader.isReleased()); - verify(subpartition, times(2)).isReleased(); - } - private void testProduceConsume(boolean isSlowProducer, boolean isSlowConsumer) throws Exception { // Config final int producerBufferPoolSize = 8; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java new file mode 100644 index 0000000..6f9920e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder; +import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createEventBufferConsumer; +import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferBuilder; +import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer; +import static org.apache.flink.runtime.io.network.partition.SubpartitionTestBase.assertNextBuffer; +import static org.apache.flink.runtime.io.network.partition.SubpartitionTestBase.assertNextEvent; +import static org.apache.flink.runtime.io.network.partition.SubpartitionTestBase.assertNoNextBuffer; +import static org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_SIZE; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; + +/** + * Additional tests for {@link PipelinedSubpartition} which require an availability listener and a + * read view. + * + * @see PipelinedSubpartitionTest + */ +public class PipelinedSubpartitionWithReadViewTest { + + private PipelinedSubpartition subpartition; + private AwaitableBufferAvailablityListener availablityListener; + private PipelinedSubpartitionView readView; + + @Before + public void setup() throws IOException { + final ResultPartition parent = mock(ResultPartition.class); + subpartition = new PipelinedSubpartition(0, parent); + availablityListener = new AwaitableBufferAvailablityListener(); + readView = subpartition.createReadView(availablityListener); + } + + @After + public void tearDown() { + readView.releaseAllResources(); + subpartition.release(); + } + + @Test(expected = IllegalStateException.class) + public void testAddTwoNonFinishedBuffer() { + subpartition.add(createBufferBuilder().createBufferConsumer()); + subpartition.add(createBufferBuilder().createBufferConsumer()); + assertNull(readView.getNextBuffer()); + } + + @Test + public void testAddEmptyNonFinishedBuffer() { + assertEquals(0, availablityListener.getNumNotifications()); + + BufferBuilder bufferBuilder = createBufferBuilder(); + subpartition.add(bufferBuilder.createBufferConsumer()); + + assertEquals(0, availablityListener.getNumNotifications()); + assertNull(readView.getNextBuffer()); + + bufferBuilder.finish(); + bufferBuilder = createBufferBuilder(); + subpartition.add(bufferBuilder.createBufferConsumer()); + + assertEquals(1, availablityListener.getNumNotifications()); // notification from finishing previous buffer. + assertNull(readView.getNextBuffer()); + assertEquals(1, subpartition.getBuffersInBacklog()); + } + + @Test + public void testAddNonEmptyNotFinishedBuffer() throws Exception { + assertEquals(0, availablityListener.getNumNotifications()); + + BufferBuilder bufferBuilder = createBufferBuilder(); + bufferBuilder.appendAndCommit(ByteBuffer.allocate(1024)); + subpartition.add(bufferBuilder.createBufferConsumer()); + + // note that since the buffer builder is not finished, there is still a retained instance! + assertNextBuffer(readView, 1024, false, 1, false, false); + assertEquals(1, subpartition.getBuffersInBacklog()); + } + + /** + * Normally moreAvailable flag from InputChannel should ignore non finished BufferConsumers, otherwise we would + * busy loop on the unfinished BufferConsumers. + */ + @Test + public void testUnfinishedBufferBehindFinished() throws Exception { + subpartition.add(createFilledBufferConsumer(1025)); // finished + subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished + + assertThat(availablityListener.getNumNotifications(), greaterThan(0L)); + assertNextBuffer(readView, 1025, false, 1, false, true); + // not notified, but we could still access the unfinished buffer + assertNextBuffer(readView, 1024, false, 1, false, false); + assertNoNextBuffer(readView); + } + + /** + * After flush call unfinished BufferConsumers should be reported as available, otherwise we might not flush some + * of the data. + */ + @Test + public void testFlushWithUnfinishedBufferBehindFinished() throws Exception { + subpartition.add(createFilledBufferConsumer(1025)); // finished + subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished + long oldNumNotifications = availablityListener.getNumNotifications(); + subpartition.flush(); + // buffer queue is > 1, should already be notified, no further notification necessary + assertThat(oldNumNotifications, greaterThan(0L)); + assertEquals(oldNumNotifications, availablityListener.getNumNotifications()); + + assertNextBuffer(readView, 1025, true, 1, false, true); + assertNextBuffer(readView, 1024, false, 1, false, false); + assertNoNextBuffer(readView); + } + + /** + * A flush call with a buffer size of 1 should always notify consumers (unless already flushed). + */ + @Test + public void testFlushWithUnfinishedBufferBehindFinished2() throws Exception { + // no buffers -> no notification or any other effects + subpartition.flush(); + assertEquals(0, availablityListener.getNumNotifications()); + + subpartition.add(createFilledBufferConsumer(1025)); // finished + subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished + + assertNextBuffer(readView, 1025, false, 1, false, true); + + long oldNumNotifications = availablityListener.getNumNotifications(); + subpartition.flush(); + // buffer queue is 1 again -> need to flush + assertEquals(oldNumNotifications + 1, availablityListener.getNumNotifications()); + subpartition.flush(); + // calling again should not flush again + assertEquals(oldNumNotifications + 1, availablityListener.getNumNotifications()); + + assertNextBuffer(readView, 1024, false, 1, false, false); + assertNoNextBuffer(readView); + } + + @Test + public void testMultipleEmptyBuffers() throws Exception { + assertEquals(0, availablityListener.getNumNotifications()); + + subpartition.add(createFilledBufferConsumer(0)); + + assertEquals(1, availablityListener.getNumNotifications()); + subpartition.add(createFilledBufferConsumer(0)); + assertEquals(2, availablityListener.getNumNotifications()); + + subpartition.add(createFilledBufferConsumer(0)); + assertEquals(2, availablityListener.getNumNotifications()); + assertEquals(3, subpartition.getBuffersInBacklog()); + + subpartition.add(createFilledBufferConsumer(1024)); + assertEquals(2, availablityListener.getNumNotifications()); + + assertNextBuffer(readView, 1024, false, 0, false, true); + } + + @Test + public void testEmptyFlush() { + subpartition.flush(); + assertEquals(0, availablityListener.getNumNotifications()); + } + + @Test + public void testBasicPipelinedProduceConsumeLogic() throws Exception { + // Empty => should return null + assertFalse(readView.nextBufferIsEvent()); + assertNoNextBuffer(readView); + assertFalse(readView.nextBufferIsEvent()); // also after getNextBuffer() + assertEquals(0, availablityListener.getNumNotifications()); + + // Add data to the queue... + subpartition.add(createFilledBufferConsumer(BUFFER_SIZE)); + assertFalse(readView.nextBufferIsEvent()); + + assertEquals(1, subpartition.getTotalNumberOfBuffers()); + assertEquals(1, subpartition.getBuffersInBacklog()); + assertEquals(0, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer + + // ...should have resulted in a notification + assertEquals(1, availablityListener.getNumNotifications()); + + // ...and one available result + assertNextBuffer(readView, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true); + assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer + assertEquals(0, subpartition.getBuffersInBacklog()); + assertNoNextBuffer(readView); + assertEquals(0, subpartition.getBuffersInBacklog()); + + // Add data to the queue... + subpartition.add(createFilledBufferConsumer(BUFFER_SIZE)); + assertFalse(readView.nextBufferIsEvent()); + + assertEquals(2, subpartition.getTotalNumberOfBuffers()); + assertEquals(1, subpartition.getBuffersInBacklog()); + assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer + assertEquals(2, availablityListener.getNumNotifications()); + + assertNextBuffer(readView, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true); + assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer + assertEquals(0, subpartition.getBuffersInBacklog()); + assertNoNextBuffer(readView); + assertEquals(0, subpartition.getBuffersInBacklog()); + + // some tests with events + + // fill with: buffer, event, and buffer + subpartition.add(createFilledBufferConsumer(BUFFER_SIZE)); + assertFalse(readView.nextBufferIsEvent()); + subpartition.add(createEventBufferConsumer(BUFFER_SIZE)); + assertFalse(readView.nextBufferIsEvent()); + subpartition.add(createFilledBufferConsumer(BUFFER_SIZE)); + assertFalse(readView.nextBufferIsEvent()); + + assertEquals(5, subpartition.getTotalNumberOfBuffers()); + assertEquals(2, subpartition.getBuffersInBacklog()); // two buffers (events don't count) + assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer + assertEquals(4, availablityListener.getNumNotifications()); + + // the first buffer + assertNextBuffer(readView, BUFFER_SIZE, true, subpartition.getBuffersInBacklog() - 1, true, true); + assertEquals(3 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer + assertEquals(1, subpartition.getBuffersInBacklog()); + + // the event + assertNextEvent(readView, BUFFER_SIZE, null, true, subpartition.getBuffersInBacklog(), false, true); + assertEquals(4 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer + assertEquals(1, subpartition.getBuffersInBacklog()); + + // the remaining buffer + assertNextBuffer(readView, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true); + assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer + assertEquals(0, subpartition.getBuffersInBacklog()); + + // nothing more + assertNoNextBuffer(readView); + assertEquals(0, subpartition.getBuffersInBacklog()); + + assertEquals(5, subpartition.getTotalNumberOfBuffers()); + assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); + assertEquals(4, availablityListener.getNumNotifications()); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java index 5989cf8..9f5e6d0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java @@ -217,7 +217,7 @@ public abstract class SubpartitionTestBase extends TestLogger { assertEquals("recycled", expectedRecycledAfterRecycle, bufferAndBacklog.buffer().isRecycled()); } - protected void assertNoNextBuffer(ResultSubpartitionView readView) throws IOException, InterruptedException { + static void assertNoNextBuffer(ResultSubpartitionView readView) throws IOException, InterruptedException { assertNull(readView.getNextBuffer()); } }