[hotfix][tests] do not use a mocked BufferRecycler for unpooled memory segments

The mock will actually keep references to the segments instead of freeing them.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/997fab62
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/997fab62
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/997fab62

Branch: refs/heads/master
Commit: 997fab6247a0d0216a69b698ed049656aa358535
Parents: 705ba2e
Author: Nico Kruber <n...@data-artisans.com>
Authored: Tue Dec 12 15:05:14 2017 +0100
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Thu Jan 18 15:24:16 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/io/network/buffer/BufferTest.java   | 8 +++-----
 .../io/network/netty/NettyMessageSerializationTest.java      | 5 ++---
 .../io/network/partition/consumer/SingleInputGateTest.java   | 5 +++--
 3 files changed, 8 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/997fab62/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
index fd11d02..57c8077 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.io.network.buffer;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+
 import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 import java.nio.ByteBuffer;
 
@@ -34,9 +34,8 @@ public class BufferTest {
        @Test
        public void testSetGetSize() {
                final MemorySegment segment = 
MemorySegmentFactory.allocateUnpooledSegment(1024);
-               final BufferRecycler recycler = 
Mockito.mock(BufferRecycler.class);
 
-               Buffer buffer = new Buffer(segment, recycler);
+               Buffer buffer = new Buffer(segment, 
FreeingBufferRecycler.INSTANCE);
                Assert.assertEquals(segment.size(), buffer.getSize());
 
                buffer.setSize(segment.size() / 2);
@@ -60,9 +59,8 @@ public class BufferTest {
        @Test
        public void testgetNioBufferThreadSafe() {
                final MemorySegment segment = 
MemorySegmentFactory.allocateUnpooledSegment(1024);
-               final BufferRecycler recycler = 
Mockito.mock(BufferRecycler.class);
 
-               Buffer buffer = new Buffer(segment, recycler);
+               Buffer buffer = new Buffer(segment, 
FreeingBufferRecycler.INSTANCE);
 
                ByteBuffer buf1 = buffer.getNioBuffer();
                ByteBuffer buf2 = buffer.getNioBuffer();

http://git-wip-us.apache.org/repos/asf/flink/blob/997fab62/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
index 98614bc..daff1c9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.event.task.IntegerTaskEvent;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -38,7 +38,6 @@ import java.util.Random;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.powermock.api.mockito.PowerMockito.spy;
@@ -55,7 +54,7 @@ public class NettyMessageSerializationTest {
        @Test
        public void testEncodeDecode() {
                {
-                       Buffer buffer = spy(new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(1024), 
mock(BufferRecycler.class)));
+                       Buffer buffer = spy(new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(1024), 
FreeingBufferRecycler.INSTANCE));
                        ByteBuffer nioBuffer = buffer.getNioBuffer();
 
                        for (int i = 0; i < 1024; i += 4) {

http://git-wip-us.apache.org/repos/asf/flink/blob/997fab62/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 59fa7a3..f45d98e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -46,6 +46,7 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.taskmanager.TaskActions;
+
 import org.junit.Test;
 
 import java.io.IOException;
@@ -125,7 +126,7 @@ public class SingleInputGateTest {
 
                final ResultSubpartitionView iterator = 
mock(ResultSubpartitionView.class);
                when(iterator.getNextBuffer()).thenReturn(
-                       new BufferAndBacklog(new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(1024), 
mock(BufferRecycler.class)), 0));
+                       new BufferAndBacklog(new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(1024), 
FreeingBufferRecycler.INSTANCE), 0));
 
                final ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
                when(partitionManager.createSubpartitionView(

Reply via email to