This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 10f155e  [BP-1.10][FLINK-20013][network] BoundedBlockingSubpartition 
may leak network buffer if task is failed or canceled
10f155e is described below

commit 10f155e3458161b74da7bf45691247b50175da4b
Author: SteNicholas <programg...@163.com>
AuthorDate: Thu Nov 12 18:03:07 2020 +0800

    [BP-1.10][FLINK-20013][network] BoundedBlockingSubpartition may leak 
network buffer if task is failed or canceled
---
 .../partition/BoundedBlockingSubpartition.java     | 10 ++++++
 .../partition/BoundedBlockingSubpartitionTest.java | 36 ++++++++++++++++++++++
 2 files changed, 46 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
index 626ba3b..6a898b5 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -194,6 +195,10 @@ final class BoundedBlockingSubpartition extends 
ResultSubpartition {
                        isReleased = true;
                        isFinished = true; // for fail fast writes
 
+                       if (currentBuffer != null) {
+                               currentBuffer.close();
+                               currentBuffer = null;
+                       }
                        checkReaderReferencesAndDispose();
                }
        }
@@ -236,6 +241,11 @@ final class BoundedBlockingSubpartition extends 
ResultSubpartition {
                }
        }
 
+       @VisibleForTesting
+       public BufferConsumer getCurrentBuffer() {
+               return currentBuffer;
+       }
+
        // ------------------------------ legacy 
----------------------------------
 
        @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
index ce4083f..dbcb4d9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
 import org.junit.AfterClass;
@@ -36,6 +38,9 @@ import java.io.IOException;
 
 import static 
org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -92,6 +97,37 @@ public class BoundedBlockingSubpartitionTest extends 
SubpartitionTestBase {
                assertTrue(reader.closed);
        }
 
+       @Test
+       public void testRecycleCurrentBufferOnFailure() throws Exception {
+               final ResultPartition resultPartition = 
createPartition(ResultPartitionType.BLOCKING, fileChannelManager);
+               final BoundedBlockingSubpartition subpartition = new 
BoundedBlockingSubpartition(
+                               0,
+                               resultPartition,
+                               new FailingBoundedData());
+               final BufferConsumer consumer = 
BufferBuilderTestUtils.createFilledFinishedBufferConsumer(100);
+
+               try {
+                       try {
+                               subpartition.add(consumer);
+                               subpartition.createReadView(new 
NoOpBufferAvailablityListener());
+                               fail("should fail with an exception");
+                       } catch (Exception ignored) {
+                               // expected
+                       }
+
+                       assertFalse(consumer.isRecycled());
+
+                       assertNotNull(subpartition.getCurrentBuffer());
+                       
assertFalse(subpartition.getCurrentBuffer().isRecycled());
+               } finally {
+                       subpartition.release();
+
+                       assertTrue(consumer.isRecycled());
+
+                       assertNull(subpartition.getCurrentBuffer());
+               }
+       }
+
        // 
------------------------------------------------------------------------
 
        @Override

Reply via email to