[FLINK-8371][network] always recycle Buffers when releasing SpillableSubpartition
There were places where Buffer instances were not released upon SpillableSubpartition#release() with a view attached to a non-spilled subpartition: 1) SpillableSubpartition#buffer: SpillableSubpartition#release() delegates the recycling to the view, but SpillableSubpartitionView does not clean up the 'buffers' queue (the recycling was only done by the subpartition if there was no view). 2) SpillableSubpartitionView#nextBuffer: If this field is populated when the subpartition is released, it will neither be given out in subsequent SpillableSubpartitionView#getNextBuffer() calls (there was a short path returning 'null' here), nor was it recycled -> similarly to the PipelinesSubpartition implementation, make SpillableSubpartition#release() always clean up and recycle the buffers -> recycle SpillableSubpartitionView#nextBuffer in SpillableSubpartitionView#releaseAllResources() This closes #5276. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/71ede399 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/71ede399 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/71ede399 Branch: refs/heads/release-1.4 Commit: 71ede3992afe8f6907dd3c6c1e232c5b745048b4 Parents: a316989 Author: Nico Kruber <n...@data-artisans.com> Authored: Fri Jan 5 18:18:35 2018 +0100 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Thu Jan 11 11:27:53 2018 +0100 ---------------------------------------------------------------------- .../partition/PipelinedSubpartition.java | 2 - .../partition/SpillableSubpartition.java | 11 ++- .../partition/SpillableSubpartitionView.java | 7 ++ .../partition/PipelinedSubpartitionTest.java | 72 ++++++++++++++ .../partition/SpillableSubpartitionTest.java | 99 ++++++++++++++++++++ 5 files changed, 185 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/71ede399/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index c1d6f13..92eb7ce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -128,7 +128,6 @@ class PipelinedSubpartition extends ResultSubpartition { buffer.recycle(); } - // Get the view... view = readView; readView = null; @@ -138,7 +137,6 @@ class PipelinedSubpartition extends ResultSubpartition { LOG.debug("Released {}.", this); - // Release all resources of the view if (view != null) { view.releaseAllResources(); } http://git-wip-us.apache.org/repos/asf/flink/blob/71ede399/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java index 4a8e165..093e9c8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java @@ -138,6 +138,7 @@ class SpillableSubpartition extends ResultSubpartition { @Override public void release() throws IOException { + // view reference accessible outside the lock, but assigned inside the locked scope final ResultSubpartitionView view; synchronized (buffers) { @@ -145,16 +146,18 @@ class SpillableSubpartition extends ResultSubpartition { return; } + // Release all available buffers + for (Buffer buffer : buffers) { + buffer.recycle(); + } + buffers.clear(); + view = readView; // No consumer yet, we are responsible to clean everything up. If // one is available, the view is responsible is to clean up (see // below). if (view == null) { - for (Buffer buffer : buffers) { - buffer.recycle(); - } - buffers.clear(); // TODO This can block until all buffers are written out to // disk if a spill is in-progress before deleting the file. http://git-wip-us.apache.org/repos/asf/flink/blob/71ede399/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java index 6781902..f88a6b6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java @@ -165,6 +165,13 @@ class SpillableSubpartitionView implements ResultSubpartitionView { if (spilled != null) { spilled.releaseAllResources(); } + // we are never giving this buffer out in getNextBuffer(), so we need to clean it up + synchronized (buffers) { + if (nextBuffer != null) { + nextBuffer.recycle(); + nextBuffer = null; + } + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/71ede399/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java index 6d36aa6..81ac40a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java @@ -19,16 +19,20 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferProvider; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.util.TestConsumerCallback; import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; import org.apache.flink.runtime.io.network.util.TestProducerSource; import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer; import org.apache.flink.runtime.io.network.util.TestSubpartitionProducer; + import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Test; import java.util.concurrent.ExecutorService; @@ -238,4 +242,72 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { producerResult.get(); consumerResult.get(); } + + + /** + * Tests cleanup of {@link PipelinedSubpartition#release()} with no read view attached. + */ + @Test + public void testCleanupReleasedPartitionNoView() throws Exception { + testCleanupReleasedPartition(false); + } + + /** + * Tests cleanup of {@link PipelinedSubpartition#release()} with a read view attached. + */ + @Test + public void testCleanupReleasedPartitionWithView() throws Exception { + testCleanupReleasedPartition(true); + } + + /** + * Tests cleanup of {@link PipelinedSubpartition#release()}. + * + * @param createView + * whether the partition should have a view attached to it (<tt>true</tt>) or not (<tt>false</tt>) + */ + private void testCleanupReleasedPartition(boolean createView) throws Exception { + PipelinedSubpartition partition = createSubpartition(); + + Buffer buffer1 = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + Buffer buffer2 = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + boolean buffer1Recycled; + boolean buffer2Recycled; + try { + partition.add(buffer1); + partition.add(buffer2); + // create the read view first + ResultSubpartitionView view = null; + if (createView) { + view = partition.createReadView(numBuffers -> {}); + } + + partition.release(); + + assertTrue(partition.isReleased()); + if (createView) { + assertTrue(view.isReleased()); + } + assertTrue(buffer1.isRecycled()); + } finally { + buffer1Recycled = buffer1.isRecycled(); + if (!buffer1Recycled) { + buffer1.recycle(); + } + buffer2Recycled = buffer2.isRecycled(); + if (!buffer2Recycled) { + buffer2.recycle(); + } + } + if (!buffer1Recycled) { + Assert.fail("buffer 1 not recycled"); + } + if (!buffer2Recycled) { + Assert.fail("buffer 2 not recycled"); + } + assertEquals(2, partition.getTotalNumberOfBuffers()); + assertEquals(2 * 4096, partition.getTotalNumberOfBytes()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/71ede399/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java index c50b361..18169b6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java @@ -541,6 +541,105 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { } } + /** + * Tests cleanup of {@link SpillableSubpartition#release()} with a spillable partition and no + * read view attached. + */ + @Test + public void testCleanupReleasedSpillablePartitionNoView() throws Exception { + testCleanupReleasedPartition(false, false); + } + + /** + * Tests cleanup of {@link SpillableSubpartition#release()} with a spillable partition and a + * read view attached - [FLINK-8371]. + */ + @Test + public void testCleanupReleasedSpillablePartitionWithView() throws Exception { + testCleanupReleasedPartition(false, true); + } + + /** + * Tests cleanup of {@link SpillableSubpartition#release()} with a spilled partition and no + * read view attached. + */ + @Test + public void testCleanupReleasedSpilledPartitionNoView() throws Exception { + testCleanupReleasedPartition(true, false); + } + + /** + * Tests cleanup of {@link SpillableSubpartition#release()} with a spilled partition and a + * read view attached. + */ + @Test + public void testCleanupReleasedSpilledPartitionWithView() throws Exception { + testCleanupReleasedPartition(true, true); + } + + /** + * Tests cleanup of {@link SpillableSubpartition#release()}. + * + * @param spilled + * whether the partition should be spilled to disk (<tt>true</tt>) or not (<tt>false</tt>, + * spillable) + * @param createView + * whether the partition should have a view attached to it (<tt>true</tt>) or not (<tt>false</tt>) + */ + private void testCleanupReleasedPartition(boolean spilled, boolean createView) throws Exception { + SpillableSubpartition partition = createSubpartition(); + + Buffer buffer1 = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + Buffer buffer2 = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + boolean buffer1Recycled; + boolean buffer2Recycled; + try { + partition.add(buffer1); + partition.add(buffer2); + // create the read view before spilling + // (tests both code paths since this view may then contain the spilled view) + ResultSubpartitionView view = null; + if (createView) { + partition.finish(); + view = partition.createReadView(numBuffers -> {}); + } + if (spilled) { + // note: in case we create a view, one buffer will already reside in the view and + // one EndOfPartitionEvent will be added instead (so overall the number of + // buffers to spill is the same + assertEquals(2, partition.releaseMemory()); + } + + partition.release(); + + assertTrue(partition.isReleased()); + if (createView) { + assertTrue(view.isReleased()); + } + assertTrue(buffer1.isRecycled()); + } finally { + buffer1Recycled = buffer1.isRecycled(); + if (!buffer1Recycled) { + buffer1.recycle(); + } + buffer2Recycled = buffer2.isRecycled(); + if (!buffer2Recycled) { + buffer2.recycle(); + } + } + if (!buffer1Recycled) { + Assert.fail("buffer 1 not recycled"); + } + if (!buffer2Recycled) { + Assert.fail("buffer 2 not recycled"); + } + // note: in case we create a view, there will be an additional EndOfPartitionEvent + assertEquals(createView ? 3 : 2, partition.getTotalNumberOfBuffers()); + assertEquals((createView ? 4 : 0) + 2 * 4096, partition.getTotalNumberOfBytes()); + } + private static class AwaitableBufferAvailablityListener implements BufferAvailabilityListener { private long numNotifiedBuffers;