[FLINK-8371][network] always recycle Buffers when releasing 
SpillableSubpartition

There were places where Buffer instances were not released upon
SpillableSubpartition#release() with a view attached to a non-spilled
subpartition:

1) SpillableSubpartition#buffer:
  SpillableSubpartition#release() delegates the recycling to the view, but
  SpillableSubpartitionView does not clean up the 'buffers' queue (the
  recycling was only done by the subpartition if there was no view).
2) SpillableSubpartitionView#nextBuffer:
  If this field is populated when the subpartition is released, it will neither
  be given out in subsequent SpillableSubpartitionView#getNextBuffer() calls
  (there was a short path returning 'null' here), nor was it recycled

-> similarly to the PipelinesSubpartition implementation, make
   SpillableSubpartition#release() always clean up and recycle the buffers
-> recycle SpillableSubpartitionView#nextBuffer in
   SpillableSubpartitionView#releaseAllResources()

This closes #5276.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/71ede399
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/71ede399
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/71ede399

Branch: refs/heads/release-1.4
Commit: 71ede3992afe8f6907dd3c6c1e232c5b745048b4
Parents: a316989
Author: Nico Kruber <n...@data-artisans.com>
Authored: Fri Jan 5 18:18:35 2018 +0100
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Thu Jan 11 11:27:53 2018 +0100

----------------------------------------------------------------------
 .../partition/PipelinedSubpartition.java        |  2 -
 .../partition/SpillableSubpartition.java        | 11 ++-
 .../partition/SpillableSubpartitionView.java    |  7 ++
 .../partition/PipelinedSubpartitionTest.java    | 72 ++++++++++++++
 .../partition/SpillableSubpartitionTest.java    | 99 ++++++++++++++++++++
 5 files changed, 185 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/71ede399/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index c1d6f13..92eb7ce 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -128,7 +128,6 @@ class PipelinedSubpartition extends ResultSubpartition {
                                buffer.recycle();
                        }
 
-                       // Get the view...
                        view = readView;
                        readView = null;
 
@@ -138,7 +137,6 @@ class PipelinedSubpartition extends ResultSubpartition {
 
                LOG.debug("Released {}.", this);
 
-               // Release all resources of the view
                if (view != null) {
                        view.releaseAllResources();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/71ede399/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 4a8e165..093e9c8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -138,6 +138,7 @@ class SpillableSubpartition extends ResultSubpartition {
 
        @Override
        public void release() throws IOException {
+               // view reference accessible outside the lock, but assigned 
inside the locked scope
                final ResultSubpartitionView view;
 
                synchronized (buffers) {
@@ -145,16 +146,18 @@ class SpillableSubpartition extends ResultSubpartition {
                                return;
                        }
 
+                       // Release all available buffers
+                       for (Buffer buffer : buffers) {
+                               buffer.recycle();
+                       }
+                       buffers.clear();
+
                        view = readView;
 
                        // No consumer yet, we are responsible to clean 
everything up. If
                        // one is available, the view is responsible is to 
clean up (see
                        // below).
                        if (view == null) {
-                               for (Buffer buffer : buffers) {
-                                       buffer.recycle();
-                               }
-                               buffers.clear();
 
                                // TODO This can block until all buffers are 
written out to
                                // disk if a spill is in-progress before 
deleting the file.

http://git-wip-us.apache.org/repos/asf/flink/blob/71ede399/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
index 6781902..f88a6b6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
@@ -165,6 +165,13 @@ class SpillableSubpartitionView implements 
ResultSubpartitionView {
                        if (spilled != null) {
                                spilled.releaseAllResources();
                        }
+                       // we are never giving this buffer out in 
getNextBuffer(), so we need to clean it up
+                       synchronized (buffers) {
+                               if (nextBuffer != null) {
+                                       nextBuffer.recycle();
+                                       nextBuffer = null;
+                               }
+                       }
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/71ede399/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 6d36aa6..81ac40a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -19,16 +19,20 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestProducerSource;
 import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
 import org.apache.flink.runtime.io.network.util.TestSubpartitionProducer;
+
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.concurrent.ExecutorService;
@@ -238,4 +242,72 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                producerResult.get();
                consumerResult.get();
        }
+
+
+       /**
+        * Tests cleanup of {@link PipelinedSubpartition#release()} with no 
read view attached.
+        */
+       @Test
+       public void testCleanupReleasedPartitionNoView() throws Exception {
+               testCleanupReleasedPartition(false);
+       }
+
+       /**
+        * Tests cleanup of {@link PipelinedSubpartition#release()} with a read 
view attached.
+        */
+       @Test
+       public void testCleanupReleasedPartitionWithView() throws Exception {
+               testCleanupReleasedPartition(true);
+       }
+
+       /**
+        * Tests cleanup of {@link PipelinedSubpartition#release()}.
+        *
+        * @param createView
+        *              whether the partition should have a view attached to it 
(<tt>true</tt>) or not (<tt>false</tt>)
+        */
+       private void testCleanupReleasedPartition(boolean createView) throws 
Exception {
+               PipelinedSubpartition partition = createSubpartition();
+
+               Buffer buffer1 = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+                       FreeingBufferRecycler.INSTANCE);
+               Buffer buffer2 = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+                       FreeingBufferRecycler.INSTANCE);
+               boolean buffer1Recycled;
+               boolean buffer2Recycled;
+               try {
+                       partition.add(buffer1);
+                       partition.add(buffer2);
+                       // create the read view first
+                       ResultSubpartitionView view = null;
+                       if (createView) {
+                               view = partition.createReadView(numBuffers -> 
{});
+                       }
+
+                       partition.release();
+
+                       assertTrue(partition.isReleased());
+                       if (createView) {
+                               assertTrue(view.isReleased());
+                       }
+                       assertTrue(buffer1.isRecycled());
+               } finally {
+                       buffer1Recycled = buffer1.isRecycled();
+                       if (!buffer1Recycled) {
+                               buffer1.recycle();
+                       }
+                       buffer2Recycled = buffer2.isRecycled();
+                       if (!buffer2Recycled) {
+                               buffer2.recycle();
+                       }
+               }
+               if (!buffer1Recycled) {
+                       Assert.fail("buffer 1 not recycled");
+               }
+               if (!buffer2Recycled) {
+                       Assert.fail("buffer 2 not recycled");
+               }
+               assertEquals(2, partition.getTotalNumberOfBuffers());
+               assertEquals(2 * 4096, partition.getTotalNumberOfBytes());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/71ede399/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index c50b361..18169b6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -541,6 +541,105 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
                }
        }
 
+       /**
+        * Tests cleanup of {@link SpillableSubpartition#release()} with a 
spillable partition and no
+        * read view attached.
+        */
+       @Test
+       public void testCleanupReleasedSpillablePartitionNoView() throws 
Exception {
+               testCleanupReleasedPartition(false, false);
+       }
+
+       /**
+        * Tests cleanup of {@link SpillableSubpartition#release()} with a 
spillable partition and a
+        * read view attached - [FLINK-8371].
+        */
+       @Test
+       public void testCleanupReleasedSpillablePartitionWithView() throws 
Exception {
+               testCleanupReleasedPartition(false, true);
+       }
+
+       /**
+        * Tests cleanup of {@link SpillableSubpartition#release()} with a 
spilled partition and no
+        * read view attached.
+        */
+       @Test
+       public void testCleanupReleasedSpilledPartitionNoView() throws 
Exception {
+               testCleanupReleasedPartition(true, false);
+       }
+
+       /**
+        * Tests cleanup of {@link SpillableSubpartition#release()} with a 
spilled partition and a
+        * read view attached.
+        */
+       @Test
+       public void testCleanupReleasedSpilledPartitionWithView() throws 
Exception {
+               testCleanupReleasedPartition(true, true);
+       }
+
+       /**
+        * Tests cleanup of {@link SpillableSubpartition#release()}.
+        *
+        * @param spilled
+        *              whether the partition should be spilled to disk 
(<tt>true</tt>) or not (<tt>false</tt>,
+        *              spillable)
+        * @param createView
+        *              whether the partition should have a view attached to it 
(<tt>true</tt>) or not (<tt>false</tt>)
+        */
+       private void testCleanupReleasedPartition(boolean spilled, boolean 
createView) throws Exception {
+               SpillableSubpartition partition = createSubpartition();
+
+               Buffer buffer1 = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+                       FreeingBufferRecycler.INSTANCE);
+               Buffer buffer2 = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+                       FreeingBufferRecycler.INSTANCE);
+               boolean buffer1Recycled;
+               boolean buffer2Recycled;
+               try {
+                       partition.add(buffer1);
+                       partition.add(buffer2);
+                       // create the read view before spilling
+                       // (tests both code paths since this view may then 
contain the spilled view)
+                       ResultSubpartitionView view = null;
+                       if (createView) {
+                               partition.finish();
+                               view = partition.createReadView(numBuffers -> 
{});
+                       }
+                       if (spilled) {
+                               // note: in case we create a view, one buffer 
will already reside in the view and
+                               //       one EndOfPartitionEvent will be added 
instead (so overall the number of
+                               //       buffers to spill is the same
+                               assertEquals(2, partition.releaseMemory());
+                       }
+
+                       partition.release();
+
+                       assertTrue(partition.isReleased());
+                       if (createView) {
+                               assertTrue(view.isReleased());
+                       }
+                       assertTrue(buffer1.isRecycled());
+               } finally {
+                       buffer1Recycled = buffer1.isRecycled();
+                       if (!buffer1Recycled) {
+                               buffer1.recycle();
+                       }
+                       buffer2Recycled = buffer2.isRecycled();
+                       if (!buffer2Recycled) {
+                               buffer2.recycle();
+                       }
+               }
+               if (!buffer1Recycled) {
+                       Assert.fail("buffer 1 not recycled");
+               }
+               if (!buffer2Recycled) {
+                       Assert.fail("buffer 2 not recycled");
+               }
+               // note: in case we create a view, there will be an additional 
EndOfPartitionEvent
+               assertEquals(createView ? 3 : 2, 
partition.getTotalNumberOfBuffers());
+               assertEquals((createView ? 4 : 0) + 2 * 4096, 
partition.getTotalNumberOfBytes());
+       }
+
        private static class AwaitableBufferAvailablityListener implements 
BufferAvailabilityListener {
 
                private long numNotifiedBuffers;

Reply via email to