[FLINK-8208][network-tests] Reduce mockito usage in RecordWriterTest

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/97db0bf9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/97db0bf9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/97db0bf9

Branch: refs/heads/master
Commit: 97db0bf9c1448a7e672f5d0235e301d03e1cf7d2
Parents: 409ea23
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Fri Dec 1 11:14:06 2017 +0100
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Mon Jan 8 11:46:00 2018 +0100

----------------------------------------------------------------------
 .../io/network/api/writer/RecordWriterTest.java | 60 +++++++++++---------
 .../network/util/TestPooledBufferProvider.java  |  6 +-
 .../runtime/io/StreamRecordWriterTest.java      | 17 +-----
 3 files changed, 41 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/97db0bf9/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 63540c3..59b98a2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.runtime.testutils.DiscardingRecycler;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.util.XORShiftRandom;
@@ -172,12 +173,11 @@ public class RecordWriterTest {
 
        @Test
        public void testClearBuffersAfterExceptionInPartitionWriter() throws 
Exception {
-               NetworkBufferPool buffers = null;
+               NetworkBufferPool buffers = new NetworkBufferPool(1, 1024);
                BufferPool bufferPool = null;
 
                try {
-                       buffers = new NetworkBufferPool(1, 1024);
-                       bufferPool = spy(buffers.createBufferPool(1, 
Integer.MAX_VALUE));
+                       bufferPool = buffers.createBufferPool(1, 
Integer.MAX_VALUE);
 
                        ResultPartitionWriter partitionWriter = 
mock(ResultPartitionWriter.class);
                        
when(partitionWriter.getBufferProvider()).thenReturn(checkNotNull(bufferPool));
@@ -190,12 +190,19 @@ public class RecordWriterTest {
                                        Buffer buffer = (Buffer) 
invocation.getArguments()[0];
                                        buffer.recycle();
 
-                                       throw new RuntimeException("Expected 
test Exception");
+                                       throw new ExpectedTestException();
                                }
                        }).when(partitionWriter).writeBuffer(any(Buffer.class), 
anyInt());
 
                        RecordWriter<IntValue> recordWriter = new 
RecordWriter<>(partitionWriter);
 
+                       // Validate that memory segment was assigned to 
recordWriter
+                       assertEquals(1, 
buffers.getNumberOfAvailableMemorySegments());
+                       assertEquals(0, 
bufferPool.getNumberOfAvailableMemorySegments());
+                       recordWriter.emit(new IntValue(0));
+                       assertEquals(0, 
buffers.getNumberOfAvailableMemorySegments());
+                       assertEquals(0, 
bufferPool.getNumberOfAvailableMemorySegments());
+
                        try {
                                // Verify that emit correctly clears the 
buffer. The infinite loop looks
                                // dangerous indeed, but the buffer will only 
be flushed after its full. Adding a
@@ -204,7 +211,7 @@ public class RecordWriterTest {
                                        recordWriter.emit(new IntValue(0));
                                }
                        }
-                       catch (Exception e) {
+                       catch (ExpectedTestException e) {
                                // Verify that the buffer is not part of the 
record writer state after a failure
                                // to flush it out. If the buffer is still part 
of the record writer state, this
                                // will fail, because the buffer has already 
been recycled. NOTE: The mock
@@ -214,66 +221,72 @@ public class RecordWriterTest {
 
                        // Verify expected methods have been called
                        verify(partitionWriter, 
times(1)).writeBuffer(any(Buffer.class), anyInt());
-                       verify(bufferPool, times(1)).requestBufferBlocking();
+                       assertEquals(1, 
bufferPool.getNumberOfAvailableMemorySegments());
 
                        try {
                                // Verify that manual flushing correctly clears 
the buffer.
                                recordWriter.emit(new IntValue(0));
+                               assertEquals(0, 
bufferPool.getNumberOfAvailableMemorySegments());
                                recordWriter.flush();
 
                                Assert.fail("Did not throw expected test 
Exception");
                        }
-                       catch (Exception e) {
+                       catch (ExpectedTestException e) {
                                recordWriter.clearBuffers();
                        }
 
                        // Verify expected methods have been called
                        verify(partitionWriter, 
times(2)).writeBuffer(any(Buffer.class), anyInt());
-                       verify(bufferPool, times(2)).requestBufferBlocking();
+                       assertEquals(1, 
bufferPool.getNumberOfAvailableMemorySegments());
 
                        try {
                                // Verify that broadcast emit correctly clears 
the buffer.
+                               recordWriter.broadcastEmit(new IntValue(0));
+                               assertEquals(0, 
bufferPool.getNumberOfAvailableMemorySegments());
+
                                for (;;) {
                                        recordWriter.broadcastEmit(new 
IntValue(0));
                                }
                        }
-                       catch (Exception e) {
+                       catch (ExpectedTestException e) {
                                recordWriter.clearBuffers();
                        }
 
                        // Verify expected methods have been called
                        verify(partitionWriter, 
times(3)).writeBuffer(any(Buffer.class), anyInt());
-                       verify(bufferPool, times(3)).requestBufferBlocking();
+                       assertEquals(1, 
bufferPool.getNumberOfAvailableMemorySegments());
 
                        try {
                                // Verify that end of super step correctly 
clears the buffer.
                                recordWriter.emit(new IntValue(0));
+                               assertEquals(0, 
bufferPool.getNumberOfAvailableMemorySegments());
                                
recordWriter.broadcastEvent(EndOfSuperstepEvent.INSTANCE);
 
                                Assert.fail("Did not throw expected test 
Exception");
                        }
-                       catch (Exception e) {
+                       catch (ExpectedTestException e) {
                                recordWriter.clearBuffers();
                        }
 
                        // Verify expected methods have been called
                        verify(partitionWriter, 
times(4)).writeBuffer(any(Buffer.class), anyInt());
-                       verify(bufferPool, times(4)).requestBufferBlocking();
+                       assertEquals(1, 
bufferPool.getNumberOfAvailableMemorySegments());
 
                        try {
                                // Verify that broadcasting and event correctly 
clears the buffer.
                                recordWriter.emit(new IntValue(0));
+                               assertEquals(0, 
bufferPool.getNumberOfAvailableMemorySegments());
                                recordWriter.broadcastEvent(new 
TestTaskEvent());
 
                                Assert.fail("Did not throw expected test 
Exception");
                        }
-                       catch (Exception e) {
+                       catch (ExpectedTestException e) {
                                recordWriter.clearBuffers();
                        }
 
                        // Verify expected methods have been called
                        verify(partitionWriter, 
times(5)).writeBuffer(any(Buffer.class), anyInt());
-                       verify(bufferPool, times(5)).requestBufferBlocking();
+                       assertEquals(1, 
bufferPool.getNumberOfAvailableMemorySegments());
                }
                finally {
                        if (bufferPool != null) {
@@ -281,20 +294,15 @@ public class RecordWriterTest {
                                bufferPool.lazyDestroy();
                        }
 
-                       if (buffers != null) {
-                               assertEquals(1, 
buffers.getNumberOfAvailableMemorySegments());
-                               buffers.destroy();
-                       }
+                       assertEquals(1, 
buffers.getNumberOfAvailableMemorySegments());
+                       buffers.destroy();
                }
        }
 
        @Test
        public void testSerializerClearedAfterClearBuffers() throws Exception {
-
-               final Buffer buffer = TestBufferFactory.createBuffer(16);
-
                ResultPartitionWriter partitionWriter = 
createResultPartitionWriter(
-                               createBufferProvider(buffer));
+                       new TestPooledBufferProvider(1, 16));
 
                RecordWriter<IntValue> recordWriter = new 
RecordWriter<IntValue>(partitionWriter);
 
@@ -324,7 +332,7 @@ public class RecordWriterTest {
                        queues[i] = new ArrayDeque<>();
                }
 
-               BufferProvider bufferProvider = 
createBufferProvider(bufferSize);
+               TestPooledBufferProvider bufferProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
 
                ResultPartitionWriter partitionWriter = 
createCollectingPartitionWriter(queues, bufferProvider);
                RecordWriter<ByteArrayIO> writer = new 
RecordWriter<>(partitionWriter, new RoundRobin<ByteArrayIO>());
@@ -333,7 +341,7 @@ public class RecordWriterTest {
                // No records emitted yet, broadcast should not request a buffer
                writer.broadcastEvent(barrier);
 
-               verify(bufferProvider, times(0)).requestBufferBlocking();
+               assertEquals(0, bufferProvider.getNumberOfCreatedBuffers());
 
                for (Queue<BufferOrEvent> queue : queues) {
                        assertEquals(1, queue.size());
@@ -360,7 +368,7 @@ public class RecordWriterTest {
                        queues[i] = new ArrayDeque<>();
                }
 
-               BufferProvider bufferProvider = 
createBufferProvider(bufferSize);
+               TestPooledBufferProvider bufferProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
 
                ResultPartitionWriter partitionWriter = 
createCollectingPartitionWriter(queues, bufferProvider);
                RecordWriter<ByteArrayIO> writer = new 
RecordWriter<>(partitionWriter, new RoundRobin<ByteArrayIO>());
@@ -393,7 +401,7 @@ public class RecordWriterTest {
                // (v) Broadcast the event
                writer.broadcastEvent(barrier);
 
-               verify(bufferProvider, times(4)).requestBufferBlocking();
+               assertEquals(4, bufferProvider.getNumberOfCreatedBuffers());
 
                assertEquals(2, queues[0].size()); // 1 buffer + 1 event
                assertEquals(3, queues[1].size()); // 2 buffers + 1 event

http://git-wip-us.apache.org/repos/asf/flink/blob/97db0bf9/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
index a88f4ba..cc52549 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
@@ -43,10 +43,14 @@ public class TestPooledBufferProvider implements 
BufferProvider {
        private final PooledBufferProviderRecycler bufferRecycler;
 
        public TestPooledBufferProvider(int poolSize) {
+               this(poolSize, 32 * 1024);
+       }
+
+       public TestPooledBufferProvider(int poolSize, int bufferSize) {
                checkArgument(poolSize > 0);
 
                this.bufferRecycler = new PooledBufferProviderRecycler(buffers);
-               this.bufferFactory = new TestBufferFactory(poolSize, 32 * 1024, 
bufferRecycler);
+               this.bufferFactory = new TestBufferFactory(poolSize, 
bufferSize, bufferRecycler);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/97db0bf9/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
index 78d4303..480cfd9 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
@@ -19,18 +19,14 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import 
org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.types.LongValue;
 
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
 
@@ -86,16 +82,7 @@ public class StreamRecordWriterTest {
        }
 
        private static ResultPartitionWriter getMockWriter(int numPartitions) 
throws Exception {
-               BufferProvider mockProvider = mock(BufferProvider.class);
-               when(mockProvider.requestBufferBlocking()).thenAnswer(new 
Answer<Buffer>() {
-                       @Override
-                       public Buffer answer(InvocationOnMock invocation) {
-                               return new Buffer(
-                                               
MemorySegmentFactory.allocateUnpooledSegment(4096),
-                                               FreeingBufferRecycler.INSTANCE);
-                       }
-               });
-
+               BufferProvider mockProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, 4096);
                ResultPartitionWriter mockWriter = 
mock(ResultPartitionWriter.class);
                when(mockWriter.getBufferProvider()).thenReturn(mockProvider);
                
when(mockWriter.getNumberOfSubpartitions()).thenReturn(numPartitions);

Reply via email to