This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 88a161101bb [FLINK-16582][runtime][test] Fix the buffer leaks in NettyBufferPoolTest 88a161101bb is described below commit 88a161101bb33b4c088325788bd11d41f9369355 Author: Yun Gao <gaoyunhen...@gmail.com> AuthorDate: Mon Dec 7 18:06:34 2020 +0800 [FLINK-16582][runtime][test] Fix the buffer leaks in NettyBufferPoolTest This closes #14319. --- .../io/network/netty/NettyBufferPoolTest.java | 58 ++++++++++++++++------ 1 file changed, 44 insertions(+), 14 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java index 3797d025e77..0aaa11ab64c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java @@ -18,36 +18,61 @@ package org.apache.flink.runtime.io.network.netty; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; + +import org.junit.After; import org.junit.Test; +import java.util.ArrayList; +import java.util.List; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; /** Tests for the {@link NettyBufferPool} wrapper. */ public class NettyBufferPoolTest { + private final List<ByteBuf> needReleasing = new ArrayList<>(); + + @After + public void tearDown() { + try { + // Release all of the buffers. + for (ByteBuf buf : needReleasing) { + buf.release(); + } + + // Checks in a separate loop in case we have sliced buffers. + for (ByteBuf buf : needReleasing) { + assertEquals(0, buf.refCnt()); + } + } finally { + needReleasing.clear(); + } + } + @Test public void testNoHeapAllocations() throws Exception { - NettyBufferPool nettyBufferPool = new NettyBufferPool(1); + final NettyBufferPool nettyBufferPool = new NettyBufferPool(1); // Buffers should prefer to be direct - assertTrue(nettyBufferPool.buffer().isDirect()); - assertTrue(nettyBufferPool.buffer(128).isDirect()); - assertTrue(nettyBufferPool.buffer(128, 256).isDirect()); + assertTrue(releaseLater(nettyBufferPool.buffer()).isDirect()); + assertTrue(releaseLater(nettyBufferPool.buffer(128)).isDirect()); + assertTrue(releaseLater(nettyBufferPool.buffer(128, 256)).isDirect()); // IO buffers should prefer to be direct - assertTrue(nettyBufferPool.ioBuffer().isDirect()); - assertTrue(nettyBufferPool.ioBuffer(128).isDirect()); - assertTrue(nettyBufferPool.ioBuffer(128, 256).isDirect()); + assertTrue(releaseLater(nettyBufferPool.ioBuffer()).isDirect()); + assertTrue(releaseLater(nettyBufferPool.ioBuffer(128)).isDirect()); + assertTrue(releaseLater(nettyBufferPool.ioBuffer(128, 256)).isDirect()); // Currently we fakes the heap buffer allocation with direct buffers - assertTrue(nettyBufferPool.heapBuffer().isDirect()); - assertTrue(nettyBufferPool.heapBuffer(128).isDirect()); - assertTrue(nettyBufferPool.heapBuffer(128, 256).isDirect()); + assertTrue(releaseLater(nettyBufferPool.heapBuffer()).isDirect()); + assertTrue(releaseLater(nettyBufferPool.heapBuffer(128)).isDirect()); + assertTrue(releaseLater(nettyBufferPool.heapBuffer(128, 256)).isDirect()); // Composite buffers allocates the corresponding type of buffers when extending its capacity - assertTrue(nettyBufferPool.compositeHeapBuffer().capacity(1024).isDirect()); - assertTrue(nettyBufferPool.compositeHeapBuffer(10).capacity(1024).isDirect()); + assertTrue(releaseLater(nettyBufferPool.compositeHeapBuffer()).capacity(1024).isDirect()); + assertTrue(releaseLater(nettyBufferPool.compositeHeapBuffer(10)).capacity(1024).isDirect()); // Is direct buffer pooled! assertTrue(nettyBufferPool.isDirectBufferPooled()); @@ -60,16 +85,21 @@ public class NettyBufferPoolTest { { // Single large buffer allocates one chunk - nettyBufferPool.directBuffer(chunkSize - 64); + releaseLater(nettyBufferPool.directBuffer(chunkSize - 64)); long allocated = nettyBufferPool.getNumberOfAllocatedBytes().get(); assertEquals(chunkSize, allocated); } { // Allocate a little more (one more chunk required) - nettyBufferPool.directBuffer(128); + releaseLater(nettyBufferPool.directBuffer(128)); long allocated = nettyBufferPool.getNumberOfAllocatedBytes().get(); assertEquals(2 * chunkSize, allocated); } } + + private ByteBuf releaseLater(ByteBuf buf) { + needReleasing.add(buf); + return buf; + } }