[hotfix][tests] do not use a mocked BufferRecycler for unpooled memory segments
The mock will actually keep references to the segments instead of freeing them. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/997fab62 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/997fab62 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/997fab62 Branch: refs/heads/master Commit: 997fab6247a0d0216a69b698ed049656aa358535 Parents: 705ba2e Author: Nico Kruber <n...@data-artisans.com> Authored: Tue Dec 12 15:05:14 2017 +0100 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Thu Jan 18 15:24:16 2018 +0100 ---------------------------------------------------------------------- .../apache/flink/runtime/io/network/buffer/BufferTest.java | 8 +++----- .../io/network/netty/NettyMessageSerializationTest.java | 5 ++--- .../io/network/partition/consumer/SingleInputGateTest.java | 5 +++-- 3 files changed, 8 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/997fab62/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java index fd11d02..57c8077 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java @@ -20,9 +20,9 @@ package org.apache.flink.runtime.io.network.buffer; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; + import org.junit.Assert; import org.junit.Test; -import org.mockito.Mockito; import java.nio.ByteBuffer; @@ -34,9 +34,8 @@ public class BufferTest { @Test public void testSetGetSize() { final MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(1024); - final BufferRecycler recycler = Mockito.mock(BufferRecycler.class); - Buffer buffer = new Buffer(segment, recycler); + Buffer buffer = new Buffer(segment, FreeingBufferRecycler.INSTANCE); Assert.assertEquals(segment.size(), buffer.getSize()); buffer.setSize(segment.size() / 2); @@ -60,9 +59,8 @@ public class BufferTest { @Test public void testgetNioBufferThreadSafe() { final MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(1024); - final BufferRecycler recycler = Mockito.mock(BufferRecycler.class); - Buffer buffer = new Buffer(segment, recycler); + Buffer buffer = new Buffer(segment, FreeingBufferRecycler.INSTANCE); ByteBuffer buf1 = buffer.getNioBuffer(); ByteBuffer buf2 = buffer.getNioBuffer(); http://git-wip-us.apache.org/repos/asf/flink/blob/997fab62/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java index 98614bc..daff1c9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java @@ -22,7 +22,7 @@ import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.event.task.IntegerTaskEvent; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; @@ -38,7 +38,6 @@ import java.util.Random; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.powermock.api.mockito.PowerMockito.spy; @@ -55,7 +54,7 @@ public class NettyMessageSerializationTest { @Test public void testEncodeDecode() { { - Buffer buffer = spy(new Buffer(MemorySegmentFactory.allocateUnpooledSegment(1024), mock(BufferRecycler.class))); + Buffer buffer = spy(new Buffer(MemorySegmentFactory.allocateUnpooledSegment(1024), FreeingBufferRecycler.INSTANCE)); ByteBuffer nioBuffer = buffer.getNioBuffer(); for (int i = 0; i < 1024; i += 4) { http://git-wip-us.apache.org/repos/asf/flink/blob/997fab62/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index 59fa7a3..f45d98e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -33,7 +33,7 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferPool; -import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; @@ -46,6 +46,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.taskmanager.TaskActions; + import org.junit.Test; import java.io.IOException; @@ -125,7 +126,7 @@ public class SingleInputGateTest { final ResultSubpartitionView iterator = mock(ResultSubpartitionView.class); when(iterator.getNextBuffer()).thenReturn( - new BufferAndBacklog(new Buffer(MemorySegmentFactory.allocateUnpooledSegment(1024), mock(BufferRecycler.class)), 0)); + new BufferAndBacklog(new Buffer(MemorySegmentFactory.allocateUnpooledSegment(1024), FreeingBufferRecycler.INSTANCE), 0)); final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); when(partitionManager.createSubpartitionView(