[hotfix][tests] make SpillableSubpartitionTest use TestBufferFactory.createBuffer
(this simplifies the test setups) Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b3fc7939 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b3fc7939 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b3fc7939 Branch: refs/heads/master Commit: b3fc79392343ff1ba364254b194ec70d2bf43dc0 Parents: 997fab6 Author: Nico Kruber <n...@data-artisans.com> Authored: Wed Jan 3 15:57:47 2018 +0100 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Thu Jan 18 15:24:16 2018 +0100 ---------------------------------------------------------------------- .../partition/SpillableSubpartitionTest.java | 33 +++++++------------- 1 file changed, 12 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b3fc7939/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java index db94c81..3b00b2e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.io.network.partition; -import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter; import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter; import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; @@ -27,10 +26,10 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsyncWithNoOpBufferFileWriter; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; -import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferProvider; -import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; +import org.apache.flink.runtime.io.network.util.TestBufferFactory; import org.junit.AfterClass; import org.junit.Assert; @@ -194,7 +193,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { public void testConsumeSpilledPartition() throws Exception { SpillableSubpartition partition = createSubpartition(); - Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), FreeingBufferRecycler.INSTANCE); + Buffer buffer = TestBufferFactory.createBuffer(4096); buffer.retain(); buffer.retain(); @@ -292,7 +291,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception { SpillableSubpartition partition = createSubpartition(); - Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), FreeingBufferRecycler.INSTANCE); + Buffer buffer = TestBufferFactory.createBuffer(4096); buffer.retain(); buffer.retain(); @@ -404,8 +403,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { assertEquals(1, partition.getTotalNumberOfBuffers()); assertEquals(4, partition.getTotalNumberOfBytes()); - Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), - FreeingBufferRecycler.INSTANCE); + Buffer buffer = TestBufferFactory.createBuffer(4096); try { partition.add(buffer); } finally { @@ -449,8 +447,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { assertEquals(0, partition.releaseMemory()); } - Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), - FreeingBufferRecycler.INSTANCE); + Buffer buffer = TestBufferFactory.createBuffer(4096); boolean bufferRecycled; try { partition.add(buffer); @@ -478,8 +475,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { SpillableSubpartition partition = createSubpartition(ioManager); assertEquals(0, partition.releaseMemory()); - Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), - FreeingBufferRecycler.INSTANCE); + Buffer buffer = TestBufferFactory.createBuffer(4096); boolean bufferRecycled; try { partition.add(buffer); @@ -525,10 +521,8 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { IOManager ioManager = new IOManagerAsyncWithNoOpBufferFileWriter(); SpillableSubpartition partition = createSubpartition(ioManager); - Buffer buffer1 = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), - FreeingBufferRecycler.INSTANCE); - Buffer buffer2 = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), - FreeingBufferRecycler.INSTANCE); + Buffer buffer1 = TestBufferFactory.createBuffer(4096); + Buffer buffer2 = TestBufferFactory.createBuffer(4096); try { // we need two buffers because the view will use one of them and not release it partition.add(buffer1); @@ -575,8 +569,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { exception.expect(IOException.class); - Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), - FreeingBufferRecycler.INSTANCE); + Buffer buffer = TestBufferFactory.createBuffer(4096); boolean bufferRecycled; try { partition.add(buffer); @@ -642,10 +635,8 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { private void testCleanupReleasedPartition(boolean spilled, boolean createView) throws Exception { SpillableSubpartition partition = createSubpartition(); - Buffer buffer1 = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), - FreeingBufferRecycler.INSTANCE); - Buffer buffer2 = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), - FreeingBufferRecycler.INSTANCE); + Buffer buffer1 = TestBufferFactory.createBuffer(4096); + Buffer buffer2 = TestBufferFactory.createBuffer(4096); boolean buffer1Recycled; boolean buffer2Recycled; try {