This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 526236a4d537e78dbac4c575611ef52b602923d9
Author: Zhijiang <wangzhijiang...@aliyun.com>
AuthorDate: Tue Jul 16 23:00:09 2019 +0800

    [FLINK-13245][network] Fix the bug of file resource leak while canceling 
partition request
    
    On producer side the netty handler receives the CancelPartitionRequest for 
releasing the SubpartitionView resource.
    In previous implementation we try to find the corresponding view via 
available queue in PartitionRequestQueue. But
    in reality the view is not always available to stay in this queue, then the 
view would never be released.
    
    Furthermore the release of ResultPartition/ResultSubpartitions is based on 
the reference counter in ReleaseOnConsumptionResultPartition,
    but while handling the CancelPartitionRequest in PartitionRequestQueue, the 
ReleaseOnConsumptionResultPartition is never
    notified of consumed subpartition. That means the reference counter would 
never decrease to 0 to trigger partition release,
    which would bring file resource leak in the case of 
BoundedBlockingSubpartition.
    
    In order to fix above two issues, the corresponding view is released via 
all reader queue instead, and then it would call
    ReleaseOnConsumptionResultPartition#onConsumedSubpartition meanwhile to 
solve this bug.
---
 .../io/network/netty/PartitionRequestQueue.java    | 33 ++++----
 .../network/netty/CancelPartitionRequestTest.java  |  4 +-
 .../network/netty/PartitionRequestQueueTest.java   | 92 ++++++++++++++++++++++
 .../io/network/partition/PartitionTestUtils.java   |  8 ++
 4 files changed, 118 insertions(+), 19 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index f82a42f..b492ea6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -138,9 +138,7 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
                }
 
                for (NetworkSequenceViewReader reader : allReaders.values()) {
-                       reader.notifySubpartitionConsumed();
-                       reader.releaseAllResources();
-                       markAsReleased(reader.getReceiverId());
+                       releaseViewReader(reader);
                }
                allReaders.clear();
        }
@@ -181,19 +179,14 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
                                return;
                        }
 
-                       // Cancel the request for the input channel
-                       int size = availableReaders.size();
-                       for (int i = 0; i < size; i++) {
-                               NetworkSequenceViewReader reader = 
pollAvailableReader();
-                               if (reader.getReceiverId().equals(toCancel)) {
-                                       reader.releaseAllResources();
-                                       markAsReleased(reader.getReceiverId());
-                               } else {
-                                       registerAvailableReader(reader);
-                               }
-                       }
+                       // remove reader from queue of available readers
+                       availableReaders.removeIf(reader -> 
reader.getReceiverId().equals(toCancel));
 
-                       allReaders.remove(toCancel);
+                       // remove reader from queue of all readers and release 
its resource
+                       final NetworkSequenceViewReader toRelease = 
allReaders.remove(toCancel);
+                       if (toRelease != null) {
+                               releaseViewReader(toRelease);
+                       }
                } else {
                        ctx.fireUserEventTriggered(msg);
                }
@@ -308,14 +301,20 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
        private void releaseAllResources() throws IOException {
                // note: this is only ever executed by one thread: the Netty IO 
thread!
                for (NetworkSequenceViewReader reader : allReaders.values()) {
-                       reader.releaseAllResources();
-                       markAsReleased(reader.getReceiverId());
+                       releaseViewReader(reader);
                }
 
                availableReaders.clear();
                allReaders.clear();
        }
 
+       private void releaseViewReader(NetworkSequenceViewReader reader) throws 
IOException {
+               reader.notifySubpartitionConsumed();
+               reader.setRegisteredAsAvailable(false);
+               reader.releaseAllResources();
+               markAsReleased(reader.getReceiverId());
+       }
+
        /**
         * Marks a receiver as released.
         */
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
index eca8263..58c02df 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
@@ -109,7 +109,7 @@ public class CancelPartitionRequestTest {
                        }
 
                        verify(view, times(1)).releaseAllResources();
-                       verify(view, times(0)).notifySubpartitionConsumed();
+                       verify(view, times(1)).notifySubpartitionConsumed();
                }
                finally {
                        shutdown(serverAndClient);
@@ -168,7 +168,7 @@ public class CancelPartitionRequestTest {
                        NettyTestUtil.awaitClose(ch);
 
                        verify(view, times(1)).releaseAllResources();
-                       verify(view, times(0)).notifySubpartitionConsumed();
+                       verify(view, times(1)).notifySubpartitionConsumed();
                }
                finally {
                        shutdown(serverAndClient);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
index a9e8662..a0dbd7a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
@@ -19,10 +19,18 @@
 package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.runtime.execution.CancelTaskException;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
 import 
org.apache.flink.runtime.io.network.partition.NoOpResultSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
 import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
@@ -32,7 +40,11 @@ import 
org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import 
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
 
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import javax.annotation.Nullable;
 
@@ -54,6 +66,24 @@ import static org.junit.Assert.assertTrue;
  */
 public class PartitionRequestQueueTest {
 
+       @ClassRule
+       public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+       private static final int BUFFER_SIZE = 1024 * 1024;
+
+       private static FileChannelManager fileChannelManager;
+
+       @BeforeClass
+       public static void setUp() throws Exception {
+               fileChannelManager = new FileChannelManagerImpl(
+                       new String[] 
{TEMPORARY_FOLDER.newFolder().getAbsolutePath()}, "testing");
+       }
+
+       @AfterClass
+       public static void shutdown() throws Exception {
+               fileChannelManager.close();
+       }
+
        /**
         * In case of enqueuing an empty reader and a reader that actually has 
some buffers when channel is not writable,
         * on channelWritability change event should result in reading all of 
the messages.
@@ -348,6 +378,68 @@ public class PartitionRequestQueueTest {
                assertNull(channel.readOutbound());
        }
 
+       @Test
+       public void testCancelPartitionRequestForUnavailableView() throws 
Exception {
+               testCancelPartitionRequest(false);
+       }
+
+       @Test
+       public void testCancelPartitionRequestForAvailableView() throws 
Exception {
+               testCancelPartitionRequest(true);
+       }
+
+       private void testCancelPartitionRequest(boolean isAvailableView) throws 
Exception {
+               // setup
+               final ResultPartitionManager partitionManager = new 
ResultPartitionManager();
+               final ResultPartition partition = 
createFinishedPartitionWithFilledData(partitionManager);
+               final InputChannelID receiverId = new InputChannelID();
+               final PartitionRequestQueue queue = new PartitionRequestQueue();
+               final CreditBasedSequenceNumberingViewReader reader = new 
CreditBasedSequenceNumberingViewReader(receiverId, 0, queue);
+               final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+               reader.requestSubpartitionView(partitionManager, 
partition.getPartitionId(), 0);
+               // add this reader into allReaders queue
+               queue.notifyReaderCreated(reader);
+
+               // block the channel so that we see an intermediate state in 
the test
+               blockChannel(channel);
+
+               // add credit to make this reader available for adding into 
availableReaders queue
+               if (isAvailableView) {
+                       queue.addCredit(receiverId, 1);
+                       
assertTrue(queue.getAvailableReaders().contains(reader));
+               }
+
+               // cancel this subpartition view
+               queue.cancel(receiverId);
+               channel.runPendingTasks();
+
+               assertFalse(queue.getAvailableReaders().contains(reader));
+               // the partition and its reader view should all be released
+               assertTrue(reader.isReleased());
+               assertTrue(partition.isReleased());
+               for (ResultSubpartition subpartition : 
partition.getAllPartitions()) {
+                       assertTrue(subpartition.isReleased());
+               }
+
+               // cleanup
+               channel.close();
+       }
+
+       private static ResultPartition 
createFinishedPartitionWithFilledData(ResultPartitionManager partitionManager) 
throws Exception {
+               final ResultPartition partition = new ResultPartitionBuilder()
+                       .setResultPartitionType(ResultPartitionType.BLOCKING)
+                       .setFileChannelManager(fileChannelManager)
+                       .setResultPartitionManager(partitionManager)
+                       .isReleasedOnConsumption(true)
+                       .build();
+
+               partitionManager.registerResultPartition(partition);
+               PartitionTestUtils.writeBuffers(partition, 1, BUFFER_SIZE);
+
+               return partition;
+       }
+
        /**
         * Blocks the given channel by adding a buffer that is bigger than the 
high watermark.
         *
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index 5e39a43..5f80421 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -33,6 +33,7 @@ import java.io.IOException;
 import java.util.EnumSet;
 import java.util.Optional;
 
+import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
@@ -158,4 +159,11 @@ public class PartitionTestUtils {
                        true,
                        releaseType);
        }
+
+       public static void writeBuffers(ResultPartition partition, int 
numberOfBuffers, int bufferSize) throws IOException {
+               for (int i = 0; i < numberOfBuffers; i++) {
+                       
partition.addBufferConsumer(createFilledBufferConsumer(bufferSize, bufferSize), 
0);
+               }
+               partition.finish();
+       }
 }

Reply via email to