This is an automated email from the ASF dual-hosted git repository.

yuanmei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 08f98b7  [FLINK-23724][network] Fix the network buffer leak when 
ResultPartition is released (#16844)
08f98b7 is described below

commit 08f98b748a203097064028e0ea066939d63b18f0
Author: caoyingjie <kevin....@alibaba-inc.com>
AuthorDate: Fri Aug 20 11:10:45 2021 +0800

    [FLINK-23724][network] Fix the network buffer leak when ResultPartition is 
released (#16844)
    
    * [FLINK-23724][network][refactor] Make TaskCanceler call 
ResultPartitionWriter#fail instead of ResultPartitionWriter#close
    
    Originally, the TaskCanceler calls the ResultPartitionWriter#close method 
to early release input and output buffer pools. However, the the 
ResultPartitionWriter#close method can also be called by the task thread to 
release other network resources which may lead to race conditions. This patch 
makes TaskCanceler call ResultPartitionWriter#fail instead of 
ResultPartitionWriter#close and close the output buffer pool in 
ResultPartitionWriter#fail which avoids the potential race conditions.
    
    This closes #16844.
    
    * [FLINK-23724][network] Fix the network buffer leak when ResultPartition 
is released
    
    This patch fixes the network buffer leak issue by closing all 
BufferBuilders in the BufferWritingResultPartition#close method.
    
    This closes #16844.
---
 .../partition/BufferWritingResultPartition.java    | 17 ++++++++
 .../io/network/partition/ResultPartition.java      | 10 ++++-
 .../org/apache/flink/runtime/taskmanager/Task.java | 46 ++++++++++++----------
 .../io/network/partition/ResultPartitionTest.java  | 39 ++++++------------
 4 files changed, 64 insertions(+), 48 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
index b4f2a1e..81e5a72 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
@@ -235,6 +235,23 @@ public abstract class BufferWritingResultPartition extends 
ResultPartition {
         }
     }
 
+    @Override
+    public void close() {
+        // We can not close these buffers in the release method because of the 
potential race
+        // condition. This close method will be only called from the Task 
thread itself.
+        if (broadcastBufferBuilder != null) {
+            broadcastBufferBuilder.close();
+            broadcastBufferBuilder = null;
+        }
+        for (int i = 0; i < unicastBufferBuilders.length; ++i) {
+            if (unicastBufferBuilders[i] != null) {
+                unicastBufferBuilders[i].close();
+                unicastBufferBuilders[i] = null;
+            }
+        }
+        super.close();
+    }
+
     private BufferBuilder appendUnicastDataForNewRecord(
             final ByteBuffer record, final int targetSubpartition) throws 
IOException {
         if (targetSubpartition < 0 || targetSubpartition > 
unicastBufferBuilders.length) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index dc6365c..90d0d92 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -250,15 +250,21 @@ public abstract class ResultPartition implements 
ResultPartitionWriter {
     /** Releases all produced data including both those stored in memory and 
persisted on disk. */
     protected abstract void releaseInternal();
 
-    @Override
-    public void close() {
+    private void closeBufferPool() {
         if (bufferPool != null) {
             bufferPool.lazyDestroy();
         }
     }
 
     @Override
+    public void close() {
+        closeBufferPool();
+    }
+
+    @Override
     public void fail(@Nullable Throwable throwable) {
+        // the task canceler thread will call this method to early release the 
output buffer pool
+        closeBufferPool();
         partitionManager.releasePartition(partitionId, throwable);
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index b0891f8..bde0ca6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -972,12 +972,15 @@ public class Task
 
         for (ResultPartitionWriter partitionWriter : 
consumableNotifyingPartitionWriters) {
             
taskEventDispatcher.unregisterPartition(partitionWriter.getPartitionId());
-            if (isCanceledOrFailed()) {
-                partitionWriter.fail(getFailureCause());
-            }
         }
 
-        closeNetworkResources();
+        // close network resources
+        if (isCanceledOrFailed()) {
+            failAllResultPartitions();
+        }
+        closeAllResultPartitions();
+        closeAllInputGates();
+
         try {
             taskStateManager.close();
         } catch (Exception e) {
@@ -985,11 +988,13 @@ public class Task
         }
     }
 
-    /**
-     * There are two scenarios to close the network resources. One is from 
{@link TaskCanceler} to
-     * early release partitions and gates. Another is from task thread during 
task exiting.
-     */
-    private void closeNetworkResources() {
+    private void failAllResultPartitions() {
+        for (ResultPartitionWriter partitionWriter : 
consumableNotifyingPartitionWriters) {
+            partitionWriter.fail(getFailureCause());
+        }
+    }
+
+    private void closeAllResultPartitions() {
         for (ResultPartitionWriter partitionWriter : 
consumableNotifyingPartitionWriters) {
             try {
                 partitionWriter.close();
@@ -999,14 +1004,14 @@ public class Task
                         "Failed to release result partition for task {}.", 
taskNameWithSubtask, t);
             }
         }
+    }
 
+    private void closeAllInputGates() {
         AbstractInvokable invokable = this.invokable;
         if (invokable == null || !invokable.isUsingNonBlockingInput()) {
-            // Cleanup resources instead of invokable if it is null,
-            // or prevent it from being blocked on input,
-            // or interrupt if it is already blocked.
-            // Not needed for StreamTask (which does NOT use blocking input); 
for which this could
-            // cause race conditions
+            // Cleanup resources instead of invokable if it is null, or 
prevent it from being
+            // blocked on input, or interrupt if it is already blocked. Not 
needed for StreamTask
+            // (which does NOT use blocking input); for which this could cause 
race conditions
             for (InputGate inputGate : inputGates) {
                 try {
                     inputGate.close();
@@ -1182,7 +1187,6 @@ public class Task
                         Runnable canceler =
                                 new TaskCanceler(
                                         LOG,
-                                        this::closeNetworkResources,
                                         taskCancellationTimeout > 0
                                                 ? taskCancellationTimeout
                                                 : 
TaskManagerOptions.TASK_CANCELLATION_TIMEOUT
@@ -1588,10 +1592,9 @@ public class Task
      * This runner calls cancel() on the invokable, closes input-/output 
resources, and initially
      * interrupts the task thread.
      */
-    private static class TaskCanceler implements Runnable {
+    private class TaskCanceler implements Runnable {
 
         private final Logger logger;
-        private final Runnable networkResourcesCloser;
         /** Time to wait after cancellation and interruption before releasing 
network resources. */
         private final long taskCancellationTimeout;
 
@@ -1601,13 +1604,11 @@ public class Task
 
         TaskCanceler(
                 Logger logger,
-                Runnable networkResourcesCloser,
                 long taskCancellationTimeout,
                 AbstractInvokable invokable,
                 Thread executer,
                 String taskName) {
             this.logger = logger;
-            this.networkResourcesCloser = networkResourcesCloser;
             this.taskCancellationTimeout = taskCancellationTimeout;
             this.invokable = invokable;
             this.executer = executer;
@@ -1640,7 +1641,12 @@ public class Task
                 // in order to unblock async Threads, which produce/consume the
                 // intermediate streams outside of the main Task Thread (like
                 // the Kafka consumer).
-                networkResourcesCloser.run();
+                // Notes: 1) This does not mean to release all network 
resources,
+                // the task thread itself will release them; 2) We can not 
close
+                // ResultPartitions here because of possible race conditions 
with
+                // Task thread so we just call the fail here.
+                failAllResultPartitions();
+                closeAllInputGates();
 
             } catch (Throwable t) {
                 ExceptionUtils.rethrowIfFatalError(t);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index b299edc..7feb002 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -343,50 +343,37 @@ public class ResultPartitionTest {
         }
     }
 
-    @Test
-    public void testReleaseMemoryOnPipelinedPartition() throws Exception {
-        testReleaseMemory(ResultPartitionType.PIPELINED);
-    }
-
     /**
-     * Tests {@link ResultPartition#releaseMemory(int)} on a working partition.
-     *
-     * @param resultPartitionType the result partition type to set up
+     * Tests {@link ResultPartition#close()} and {@link 
ResultPartition#release()} on a working
+     * pipelined partition.
      */
-    private void testReleaseMemory(final ResultPartitionType 
resultPartitionType) throws Exception {
+    @Test
+    public void testReleaseMemoryOnPipelinedPartition() throws Exception {
         final int numAllBuffers = 10;
         final NettyShuffleEnvironment network =
                 new NettyShuffleEnvironmentBuilder()
                         .setNumNetworkBuffers(numAllBuffers)
                         .setBufferSize(bufferSize)
                         .build();
-        final ResultPartition resultPartition = createPartition(network, 
resultPartitionType, 1);
+        final ResultPartition resultPartition =
+                createPartition(network, ResultPartitionType.PIPELINED, 1);
         try {
             resultPartition.setup();
 
             // take all buffers (more than the minimum required)
             for (int i = 0; i < numAllBuffers; ++i) {
-                resultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
+                resultPartition.emitRecord(ByteBuffer.allocate(bufferSize - 
1), 0);
             }
-            resultPartition.finish();
-
             assertEquals(0, 
resultPartition.getBufferPool().getNumberOfAvailableMemorySegments());
 
-            // reset the pool size less than the number of requested buffers
-            final int numLocalBuffers = 4;
-            resultPartition.getBufferPool().setNumBuffers(numLocalBuffers);
+            resultPartition.close();
+            assertTrue(resultPartition.getBufferPool().isDestroyed());
+            assertEquals(
+                    numAllBuffers, 
network.getNetworkBufferPool().getNumberOfUsedMemorySegments());
 
-            // partition with blocking type should release excess buffers
-            if (!resultPartitionType.hasBackPressure()) {
-                assertEquals(
-                        numLocalBuffers,
-                        
resultPartition.getBufferPool().getNumberOfAvailableMemorySegments());
-            } else {
-                assertEquals(
-                        0, 
resultPartition.getBufferPool().getNumberOfAvailableMemorySegments());
-            }
-        } finally {
             resultPartition.release();
+            assertEquals(0, 
network.getNetworkBufferPool().getNumberOfUsedMemorySegments());
+        } finally {
             network.close();
         }
     }

Reply via email to