This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push: new 10f155e [BP-1.10][FLINK-20013][network] BoundedBlockingSubpartition may leak network buffer if task is failed or canceled 10f155e is described below commit 10f155e3458161b74da7bf45691247b50175da4b Author: SteNicholas <programg...@163.com> AuthorDate: Thu Nov 12 18:03:07 2020 +0800 [BP-1.10][FLINK-20013][network] BoundedBlockingSubpartition may leak network buffer if task is failed or canceled --- .../partition/BoundedBlockingSubpartition.java | 10 ++++++ .../partition/BoundedBlockingSubpartitionTest.java | 36 ++++++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java index 626ba3b..6a898b5 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -194,6 +195,10 @@ final class BoundedBlockingSubpartition extends ResultSubpartition { isReleased = true; isFinished = true; // for fail fast writes + if (currentBuffer != null) { + currentBuffer.close(); + currentBuffer = null; + } checkReaderReferencesAndDispose(); } } @@ -236,6 +241,11 @@ final class BoundedBlockingSubpartition extends ResultSubpartition { } } + @VisibleForTesting + public BufferConsumer getCurrentBuffer() { + return currentBuffer; + } + // ------------------------------ legacy ---------------------------------- @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java index ce4083f..dbcb4d9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java @@ -21,6 +21,8 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.runtime.io.disk.FileChannelManager; import org.apache.flink.runtime.io.disk.FileChannelManagerImpl; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; +import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import org.apache.flink.runtime.util.EnvironmentInformation; import org.junit.AfterClass; @@ -36,6 +38,9 @@ import java.io.IOException; import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -92,6 +97,37 @@ public class BoundedBlockingSubpartitionTest extends SubpartitionTestBase { assertTrue(reader.closed); } + @Test + public void testRecycleCurrentBufferOnFailure() throws Exception { + final ResultPartition resultPartition = createPartition(ResultPartitionType.BLOCKING, fileChannelManager); + final BoundedBlockingSubpartition subpartition = new BoundedBlockingSubpartition( + 0, + resultPartition, + new FailingBoundedData()); + final BufferConsumer consumer = BufferBuilderTestUtils.createFilledFinishedBufferConsumer(100); + + try { + try { + subpartition.add(consumer); + subpartition.createReadView(new NoOpBufferAvailablityListener()); + fail("should fail with an exception"); + } catch (Exception ignored) { + // expected + } + + assertFalse(consumer.isRecycled()); + + assertNotNull(subpartition.getCurrentBuffer()); + assertFalse(subpartition.getCurrentBuffer().isRecycled()); + } finally { + subpartition.release(); + + assertTrue(consumer.isRecycled()); + + assertNull(subpartition.getCurrentBuffer()); + } + } + // ------------------------------------------------------------------------ @Override