[hotfix][tests] make SpillableSubpartitionTest use 
TestBufferFactory.createBuffer

(this simplifies the test setups)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b3fc7939
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b3fc7939
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b3fc7939

Branch: refs/heads/master
Commit: b3fc79392343ff1ba364254b194ec70d2bf43dc0
Parents: 997fab6
Author: Nico Kruber <n...@data-artisans.com>
Authored: Wed Jan 3 15:57:47 2018 +0100
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Thu Jan 18 15:24:16 2018 +0100

----------------------------------------------------------------------
 .../partition/SpillableSubpartitionTest.java    | 33 +++++++-------------
 1 file changed, 12 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b3fc7939/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index db94c81..3b00b2e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter;
 import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
 import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
@@ -27,10 +26,10 @@ import 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsyncWithNoOpBufferFileWriter;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -194,7 +193,7 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
        public void testConsumeSpilledPartition() throws Exception {
                SpillableSubpartition partition = createSubpartition();
 
-               Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), 
FreeingBufferRecycler.INSTANCE);
+               Buffer buffer = TestBufferFactory.createBuffer(4096);
                buffer.retain();
                buffer.retain();
 
@@ -292,7 +291,7 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
        public void testConsumeSpillablePartitionSpilledDuringConsume() throws 
Exception {
                SpillableSubpartition partition = createSubpartition();
 
-               Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), 
FreeingBufferRecycler.INSTANCE);
+               Buffer buffer = TestBufferFactory.createBuffer(4096);
                buffer.retain();
                buffer.retain();
 
@@ -404,8 +403,7 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                assertEquals(1, partition.getTotalNumberOfBuffers());
                assertEquals(4, partition.getTotalNumberOfBytes());
 
-               Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
-                       FreeingBufferRecycler.INSTANCE);
+               Buffer buffer = TestBufferFactory.createBuffer(4096);
                try {
                        partition.add(buffer);
                } finally {
@@ -449,8 +447,7 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                        assertEquals(0, partition.releaseMemory());
                }
 
-               Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
-                       FreeingBufferRecycler.INSTANCE);
+               Buffer buffer = TestBufferFactory.createBuffer(4096);
                boolean bufferRecycled;
                try {
                        partition.add(buffer);
@@ -478,8 +475,7 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                SpillableSubpartition partition = createSubpartition(ioManager);
                assertEquals(0, partition.releaseMemory());
 
-               Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
-                       FreeingBufferRecycler.INSTANCE);
+               Buffer buffer = TestBufferFactory.createBuffer(4096);
                boolean bufferRecycled;
                try {
                        partition.add(buffer);
@@ -525,10 +521,8 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                IOManager ioManager = new 
IOManagerAsyncWithNoOpBufferFileWriter();
                SpillableSubpartition partition = createSubpartition(ioManager);
 
-               Buffer buffer1 = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
-                       FreeingBufferRecycler.INSTANCE);
-               Buffer buffer2 = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
-                       FreeingBufferRecycler.INSTANCE);
+               Buffer buffer1 = TestBufferFactory.createBuffer(4096);
+               Buffer buffer2 = TestBufferFactory.createBuffer(4096);
                try {
                        // we need two buffers because the view will use one of 
them and not release it
                        partition.add(buffer1);
@@ -575,8 +569,7 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
 
                exception.expect(IOException.class);
 
-               Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
-                       FreeingBufferRecycler.INSTANCE);
+               Buffer buffer = TestBufferFactory.createBuffer(4096);
                boolean bufferRecycled;
                try {
                        partition.add(buffer);
@@ -642,10 +635,8 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
        private void testCleanupReleasedPartition(boolean spilled, boolean 
createView) throws Exception {
                SpillableSubpartition partition = createSubpartition();
 
-               Buffer buffer1 = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
-                       FreeingBufferRecycler.INSTANCE);
-               Buffer buffer2 = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
-                       FreeingBufferRecycler.INSTANCE);
+               Buffer buffer1 = TestBufferFactory.createBuffer(4096);
+               Buffer buffer2 = TestBufferFactory.createBuffer(4096);
                boolean buffer1Recycled;
                boolean buffer2Recycled;
                try {

Reply via email to