[FLINK-8208][network-tests] Reduce mockito usage in RecordWriterTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/97db0bf9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/97db0bf9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/97db0bf9 Branch: refs/heads/master Commit: 97db0bf9c1448a7e672f5d0235e301d03e1cf7d2 Parents: 409ea23 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> Authored: Fri Dec 1 11:14:06 2017 +0100 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Mon Jan 8 11:46:00 2018 +0100 ---------------------------------------------------------------------- .../io/network/api/writer/RecordWriterTest.java | 60 +++++++++++--------- .../network/util/TestPooledBufferProvider.java | 6 +- .../runtime/io/StreamRecordWriterTest.java | 17 +----- 3 files changed, 41 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/97db0bf9/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java index 63540c3..59b98a2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.util.TestBufferFactory; import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; import org.apache.flink.runtime.io.network.util.TestTaskEvent; +import org.apache.flink.runtime.operators.testutils.ExpectedTestException; import org.apache.flink.runtime.testutils.DiscardingRecycler; import org.apache.flink.types.IntValue; import org.apache.flink.util.XORShiftRandom; @@ -172,12 +173,11 @@ public class RecordWriterTest { @Test public void testClearBuffersAfterExceptionInPartitionWriter() throws Exception { - NetworkBufferPool buffers = null; + NetworkBufferPool buffers = new NetworkBufferPool(1, 1024); BufferPool bufferPool = null; try { - buffers = new NetworkBufferPool(1, 1024); - bufferPool = spy(buffers.createBufferPool(1, Integer.MAX_VALUE)); + bufferPool = buffers.createBufferPool(1, Integer.MAX_VALUE); ResultPartitionWriter partitionWriter = mock(ResultPartitionWriter.class); when(partitionWriter.getBufferProvider()).thenReturn(checkNotNull(bufferPool)); @@ -190,12 +190,19 @@ public class RecordWriterTest { Buffer buffer = (Buffer) invocation.getArguments()[0]; buffer.recycle(); - throw new RuntimeException("Expected test Exception"); + throw new ExpectedTestException(); } }).when(partitionWriter).writeBuffer(any(Buffer.class), anyInt()); RecordWriter<IntValue> recordWriter = new RecordWriter<>(partitionWriter); + // Validate that memory segment was assigned to recordWriter + assertEquals(1, buffers.getNumberOfAvailableMemorySegments()); + assertEquals(0, bufferPool.getNumberOfAvailableMemorySegments()); + recordWriter.emit(new IntValue(0)); + assertEquals(0, buffers.getNumberOfAvailableMemorySegments()); + assertEquals(0, bufferPool.getNumberOfAvailableMemorySegments()); + try { // Verify that emit correctly clears the buffer. The infinite loop looks // dangerous indeed, but the buffer will only be flushed after its full. Adding a @@ -204,7 +211,7 @@ public class RecordWriterTest { recordWriter.emit(new IntValue(0)); } } - catch (Exception e) { + catch (ExpectedTestException e) { // Verify that the buffer is not part of the record writer state after a failure // to flush it out. If the buffer is still part of the record writer state, this // will fail, because the buffer has already been recycled. NOTE: The mock @@ -214,66 +221,72 @@ public class RecordWriterTest { // Verify expected methods have been called verify(partitionWriter, times(1)).writeBuffer(any(Buffer.class), anyInt()); - verify(bufferPool, times(1)).requestBufferBlocking(); + assertEquals(1, bufferPool.getNumberOfAvailableMemorySegments()); try { // Verify that manual flushing correctly clears the buffer. recordWriter.emit(new IntValue(0)); + assertEquals(0, bufferPool.getNumberOfAvailableMemorySegments()); recordWriter.flush(); Assert.fail("Did not throw expected test Exception"); } - catch (Exception e) { + catch (ExpectedTestException e) { recordWriter.clearBuffers(); } // Verify expected methods have been called verify(partitionWriter, times(2)).writeBuffer(any(Buffer.class), anyInt()); - verify(bufferPool, times(2)).requestBufferBlocking(); + assertEquals(1, bufferPool.getNumberOfAvailableMemorySegments()); try { // Verify that broadcast emit correctly clears the buffer. + recordWriter.broadcastEmit(new IntValue(0)); + assertEquals(0, bufferPool.getNumberOfAvailableMemorySegments()); + for (;;) { recordWriter.broadcastEmit(new IntValue(0)); } } - catch (Exception e) { + catch (ExpectedTestException e) { recordWriter.clearBuffers(); } // Verify expected methods have been called verify(partitionWriter, times(3)).writeBuffer(any(Buffer.class), anyInt()); - verify(bufferPool, times(3)).requestBufferBlocking(); + assertEquals(1, bufferPool.getNumberOfAvailableMemorySegments()); try { // Verify that end of super step correctly clears the buffer. recordWriter.emit(new IntValue(0)); + assertEquals(0, bufferPool.getNumberOfAvailableMemorySegments()); recordWriter.broadcastEvent(EndOfSuperstepEvent.INSTANCE); Assert.fail("Did not throw expected test Exception"); } - catch (Exception e) { + catch (ExpectedTestException e) { recordWriter.clearBuffers(); } // Verify expected methods have been called verify(partitionWriter, times(4)).writeBuffer(any(Buffer.class), anyInt()); - verify(bufferPool, times(4)).requestBufferBlocking(); + assertEquals(1, bufferPool.getNumberOfAvailableMemorySegments()); try { // Verify that broadcasting and event correctly clears the buffer. recordWriter.emit(new IntValue(0)); + assertEquals(0, bufferPool.getNumberOfAvailableMemorySegments()); recordWriter.broadcastEvent(new TestTaskEvent()); Assert.fail("Did not throw expected test Exception"); } - catch (Exception e) { + catch (ExpectedTestException e) { recordWriter.clearBuffers(); } // Verify expected methods have been called verify(partitionWriter, times(5)).writeBuffer(any(Buffer.class), anyInt()); - verify(bufferPool, times(5)).requestBufferBlocking(); + assertEquals(1, bufferPool.getNumberOfAvailableMemorySegments()); } finally { if (bufferPool != null) { @@ -281,20 +294,15 @@ public class RecordWriterTest { bufferPool.lazyDestroy(); } - if (buffers != null) { - assertEquals(1, buffers.getNumberOfAvailableMemorySegments()); - buffers.destroy(); - } + assertEquals(1, buffers.getNumberOfAvailableMemorySegments()); + buffers.destroy(); } } @Test public void testSerializerClearedAfterClearBuffers() throws Exception { - - final Buffer buffer = TestBufferFactory.createBuffer(16); - ResultPartitionWriter partitionWriter = createResultPartitionWriter( - createBufferProvider(buffer)); + new TestPooledBufferProvider(1, 16)); RecordWriter<IntValue> recordWriter = new RecordWriter<IntValue>(partitionWriter); @@ -324,7 +332,7 @@ public class RecordWriterTest { queues[i] = new ArrayDeque<>(); } - BufferProvider bufferProvider = createBufferProvider(bufferSize); + TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize); ResultPartitionWriter partitionWriter = createCollectingPartitionWriter(queues, bufferProvider); RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new RoundRobin<ByteArrayIO>()); @@ -333,7 +341,7 @@ public class RecordWriterTest { // No records emitted yet, broadcast should not request a buffer writer.broadcastEvent(barrier); - verify(bufferProvider, times(0)).requestBufferBlocking(); + assertEquals(0, bufferProvider.getNumberOfCreatedBuffers()); for (Queue<BufferOrEvent> queue : queues) { assertEquals(1, queue.size()); @@ -360,7 +368,7 @@ public class RecordWriterTest { queues[i] = new ArrayDeque<>(); } - BufferProvider bufferProvider = createBufferProvider(bufferSize); + TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize); ResultPartitionWriter partitionWriter = createCollectingPartitionWriter(queues, bufferProvider); RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new RoundRobin<ByteArrayIO>()); @@ -393,7 +401,7 @@ public class RecordWriterTest { // (v) Broadcast the event writer.broadcastEvent(barrier); - verify(bufferProvider, times(4)).requestBufferBlocking(); + assertEquals(4, bufferProvider.getNumberOfCreatedBuffers()); assertEquals(2, queues[0].size()); // 1 buffer + 1 event assertEquals(3, queues[1].size()); // 2 buffers + 1 event http://git-wip-us.apache.org/repos/asf/flink/blob/97db0bf9/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java index a88f4ba..cc52549 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java @@ -43,10 +43,14 @@ public class TestPooledBufferProvider implements BufferProvider { private final PooledBufferProviderRecycler bufferRecycler; public TestPooledBufferProvider(int poolSize) { + this(poolSize, 32 * 1024); + } + + public TestPooledBufferProvider(int poolSize, int bufferSize) { checkArgument(poolSize > 0); this.bufferRecycler = new PooledBufferProviderRecycler(buffers); - this.bufferFactory = new TestBufferFactory(poolSize, 32 * 1024, bufferRecycler); + this.bufferFactory = new TestBufferFactory(poolSize, bufferSize, bufferRecycler); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/97db0bf9/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java index 78d4303..480cfd9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java @@ -19,18 +19,14 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.io.network.api.writer.ChannelSelector; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector; -import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferProvider; -import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; import org.apache.flink.types.LongValue; import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import java.io.IOException; @@ -86,16 +82,7 @@ public class StreamRecordWriterTest { } private static ResultPartitionWriter getMockWriter(int numPartitions) throws Exception { - BufferProvider mockProvider = mock(BufferProvider.class); - when(mockProvider.requestBufferBlocking()).thenAnswer(new Answer<Buffer>() { - @Override - public Buffer answer(InvocationOnMock invocation) { - return new Buffer( - MemorySegmentFactory.allocateUnpooledSegment(4096), - FreeingBufferRecycler.INSTANCE); - } - }); - + BufferProvider mockProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, 4096); ResultPartitionWriter mockWriter = mock(ResultPartitionWriter.class); when(mockWriter.getBufferProvider()).thenReturn(mockProvider); when(mockWriter.getNumberOfSubpartitions()).thenReturn(numPartitions);