This is an automated email from the ASF dual-hosted git repository. yuanmei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 08f98b7 [FLINK-23724][network] Fix the network buffer leak when ResultPartition is released (#16844) 08f98b7 is described below commit 08f98b748a203097064028e0ea066939d63b18f0 Author: caoyingjie <kevin....@alibaba-inc.com> AuthorDate: Fri Aug 20 11:10:45 2021 +0800 [FLINK-23724][network] Fix the network buffer leak when ResultPartition is released (#16844) * [FLINK-23724][network][refactor] Make TaskCanceler call ResultPartitionWriter#fail instead of ResultPartitionWriter#close Originally, the TaskCanceler calls the ResultPartitionWriter#close method to early release input and output buffer pools. However, the the ResultPartitionWriter#close method can also be called by the task thread to release other network resources which may lead to race conditions. This patch makes TaskCanceler call ResultPartitionWriter#fail instead of ResultPartitionWriter#close and close the output buffer pool in ResultPartitionWriter#fail which avoids the potential race conditions. This closes #16844. * [FLINK-23724][network] Fix the network buffer leak when ResultPartition is released This patch fixes the network buffer leak issue by closing all BufferBuilders in the BufferWritingResultPartition#close method. This closes #16844. --- .../partition/BufferWritingResultPartition.java | 17 ++++++++ .../io/network/partition/ResultPartition.java | 10 ++++- .../org/apache/flink/runtime/taskmanager/Task.java | 46 ++++++++++++---------- .../io/network/partition/ResultPartitionTest.java | 39 ++++++------------ 4 files changed, 64 insertions(+), 48 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java index b4f2a1e..81e5a72 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java @@ -235,6 +235,23 @@ public abstract class BufferWritingResultPartition extends ResultPartition { } } + @Override + public void close() { + // We can not close these buffers in the release method because of the potential race + // condition. This close method will be only called from the Task thread itself. + if (broadcastBufferBuilder != null) { + broadcastBufferBuilder.close(); + broadcastBufferBuilder = null; + } + for (int i = 0; i < unicastBufferBuilders.length; ++i) { + if (unicastBufferBuilders[i] != null) { + unicastBufferBuilders[i].close(); + unicastBufferBuilders[i] = null; + } + } + super.close(); + } + private BufferBuilder appendUnicastDataForNewRecord( final ByteBuffer record, final int targetSubpartition) throws IOException { if (targetSubpartition < 0 || targetSubpartition > unicastBufferBuilders.length) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index dc6365c..90d0d92 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -250,15 +250,21 @@ public abstract class ResultPartition implements ResultPartitionWriter { /** Releases all produced data including both those stored in memory and persisted on disk. */ protected abstract void releaseInternal(); - @Override - public void close() { + private void closeBufferPool() { if (bufferPool != null) { bufferPool.lazyDestroy(); } } @Override + public void close() { + closeBufferPool(); + } + + @Override public void fail(@Nullable Throwable throwable) { + // the task canceler thread will call this method to early release the output buffer pool + closeBufferPool(); partitionManager.releasePartition(partitionId, throwable); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index b0891f8..bde0ca6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -972,12 +972,15 @@ public class Task for (ResultPartitionWriter partitionWriter : consumableNotifyingPartitionWriters) { taskEventDispatcher.unregisterPartition(partitionWriter.getPartitionId()); - if (isCanceledOrFailed()) { - partitionWriter.fail(getFailureCause()); - } } - closeNetworkResources(); + // close network resources + if (isCanceledOrFailed()) { + failAllResultPartitions(); + } + closeAllResultPartitions(); + closeAllInputGates(); + try { taskStateManager.close(); } catch (Exception e) { @@ -985,11 +988,13 @@ public class Task } } - /** - * There are two scenarios to close the network resources. One is from {@link TaskCanceler} to - * early release partitions and gates. Another is from task thread during task exiting. - */ - private void closeNetworkResources() { + private void failAllResultPartitions() { + for (ResultPartitionWriter partitionWriter : consumableNotifyingPartitionWriters) { + partitionWriter.fail(getFailureCause()); + } + } + + private void closeAllResultPartitions() { for (ResultPartitionWriter partitionWriter : consumableNotifyingPartitionWriters) { try { partitionWriter.close(); @@ -999,14 +1004,14 @@ public class Task "Failed to release result partition for task {}.", taskNameWithSubtask, t); } } + } + private void closeAllInputGates() { AbstractInvokable invokable = this.invokable; if (invokable == null || !invokable.isUsingNonBlockingInput()) { - // Cleanup resources instead of invokable if it is null, - // or prevent it from being blocked on input, - // or interrupt if it is already blocked. - // Not needed for StreamTask (which does NOT use blocking input); for which this could - // cause race conditions + // Cleanup resources instead of invokable if it is null, or prevent it from being + // blocked on input, or interrupt if it is already blocked. Not needed for StreamTask + // (which does NOT use blocking input); for which this could cause race conditions for (InputGate inputGate : inputGates) { try { inputGate.close(); @@ -1182,7 +1187,6 @@ public class Task Runnable canceler = new TaskCanceler( LOG, - this::closeNetworkResources, taskCancellationTimeout > 0 ? taskCancellationTimeout : TaskManagerOptions.TASK_CANCELLATION_TIMEOUT @@ -1588,10 +1592,9 @@ public class Task * This runner calls cancel() on the invokable, closes input-/output resources, and initially * interrupts the task thread. */ - private static class TaskCanceler implements Runnable { + private class TaskCanceler implements Runnable { private final Logger logger; - private final Runnable networkResourcesCloser; /** Time to wait after cancellation and interruption before releasing network resources. */ private final long taskCancellationTimeout; @@ -1601,13 +1604,11 @@ public class Task TaskCanceler( Logger logger, - Runnable networkResourcesCloser, long taskCancellationTimeout, AbstractInvokable invokable, Thread executer, String taskName) { this.logger = logger; - this.networkResourcesCloser = networkResourcesCloser; this.taskCancellationTimeout = taskCancellationTimeout; this.invokable = invokable; this.executer = executer; @@ -1640,7 +1641,12 @@ public class Task // in order to unblock async Threads, which produce/consume the // intermediate streams outside of the main Task Thread (like // the Kafka consumer). - networkResourcesCloser.run(); + // Notes: 1) This does not mean to release all network resources, + // the task thread itself will release them; 2) We can not close + // ResultPartitions here because of possible race conditions with + // Task thread so we just call the fail here. + failAllResultPartitions(); + closeAllInputGates(); } catch (Throwable t) { ExceptionUtils.rethrowIfFatalError(t); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java index b299edc..7feb002 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java @@ -343,50 +343,37 @@ public class ResultPartitionTest { } } - @Test - public void testReleaseMemoryOnPipelinedPartition() throws Exception { - testReleaseMemory(ResultPartitionType.PIPELINED); - } - /** - * Tests {@link ResultPartition#releaseMemory(int)} on a working partition. - * - * @param resultPartitionType the result partition type to set up + * Tests {@link ResultPartition#close()} and {@link ResultPartition#release()} on a working + * pipelined partition. */ - private void testReleaseMemory(final ResultPartitionType resultPartitionType) throws Exception { + @Test + public void testReleaseMemoryOnPipelinedPartition() throws Exception { final int numAllBuffers = 10; final NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() .setNumNetworkBuffers(numAllBuffers) .setBufferSize(bufferSize) .build(); - final ResultPartition resultPartition = createPartition(network, resultPartitionType, 1); + final ResultPartition resultPartition = + createPartition(network, ResultPartitionType.PIPELINED, 1); try { resultPartition.setup(); // take all buffers (more than the minimum required) for (int i = 0; i < numAllBuffers; ++i) { - resultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0); + resultPartition.emitRecord(ByteBuffer.allocate(bufferSize - 1), 0); } - resultPartition.finish(); - assertEquals(0, resultPartition.getBufferPool().getNumberOfAvailableMemorySegments()); - // reset the pool size less than the number of requested buffers - final int numLocalBuffers = 4; - resultPartition.getBufferPool().setNumBuffers(numLocalBuffers); + resultPartition.close(); + assertTrue(resultPartition.getBufferPool().isDestroyed()); + assertEquals( + numAllBuffers, network.getNetworkBufferPool().getNumberOfUsedMemorySegments()); - // partition with blocking type should release excess buffers - if (!resultPartitionType.hasBackPressure()) { - assertEquals( - numLocalBuffers, - resultPartition.getBufferPool().getNumberOfAvailableMemorySegments()); - } else { - assertEquals( - 0, resultPartition.getBufferPool().getNumberOfAvailableMemorySegments()); - } - } finally { resultPartition.release(); + assertEquals(0, network.getNetworkBufferPool().getNumberOfUsedMemorySegments()); + } finally { network.close(); } }