[ https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16166324#comment-16166324 ]
ASF GitHub Bot commented on FLINK-7378: --------------------------------------- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r138891582 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java --- @@ -172,44 +178,117 @@ public void testDestroyAll() { } } + /** + * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} + * currently containing the number of required free segments. + */ @Test - public void testRequestAndRecycleMemorySegments() throws Exception { + public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception { final int numBuffers = 10; NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - List<MemorySegment> segments = null; - // request buffers from global pool with illegal argument + List<MemorySegment> memorySegments = Collections.emptyList(); try { - segments = globalPool.requestMemorySegments(0); - fail("Should throw an IllegalArgumentException"); - } catch (IllegalArgumentException e) { - assertNull(segments); + memorySegments = globalPool.requestMemorySegments(numBuffers / 2); + + assertEquals(memorySegments.size(), numBuffers / 2); + } finally { + globalPool.recycleMemorySegments(memorySegments); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } + } - // common case to request buffers less than the total capacity of global pool - final int numRequiredBuffers = 8; - segments = globalPool.requestMemorySegments(numRequiredBuffers); - - assertNotNull(segments); - assertEquals(segments.size(), numRequiredBuffers); - - // recycle all the requested buffers to global pool - globalPool.recycleMemorySegments(segments); + /** + * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the number of required + * buffers exceeding the capacity of {@link NetworkBufferPool}. + */ + @Test + public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception { + final int numBuffers = 10; - assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); - // uncommon case to request buffers exceeding the total capacity of global pool + List<MemorySegment> memorySegments = Collections.emptyList(); try { - segments = null; - segments = globalPool.requestMemorySegments(11); + memorySegments = globalPool.requestMemorySegments(numBuffers + 1); fail("Should throw an IOException"); } catch (IOException e) { - assertNull(segments); - // recycle all the requested buffers to global pool after exception + assertEquals(memorySegments.size(), 0); assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); } + } + /** + * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the invalid argument to + * cause exception. + */ + @Test + public void testRequestMemorySegmentsWithInvalidArgument() throws Exception { + final int numBuffers = 10; + + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); + + List<MemorySegment> memorySegments = Collections.emptyList(); + try { + // the number of requested buffers should be larger than zero + memorySegments = globalPool.requestMemorySegments(0); + fail("Should throw an IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertEquals(memorySegments.size(), 0); + assertEquals(globalPool.getNumberOfAvailableMemorySegments(), numBuffers); + } + } + + /** + * Tests {@link NetworkBufferPool#requestMemorySegments(int)} with the {@link NetworkBufferPool} + * currently not containing the number of required free segments (currently occupied by a buffer pool). + */ + @Test + public void testRequestMemorySegmentsWithBuffersTaken() throws IOException, InterruptedException { + final int numBuffers = 10; + + NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); + + final List<Buffer> buffers = new ArrayList<>(numBuffers); + List<MemorySegment> memorySegments = Collections.emptyList(); + Thread bufferRecycler = null; + BufferPool lbp1 = null; + try { + lbp1 = networkBufferPool.createBufferPool(numBuffers / 2, numBuffers); + + // take all buffers (more than the minimum required) + for (int i = 0; i < numBuffers; ++i) { + Buffer buffer = lbp1.requestBuffer(); + buffers.add(buffer); + assertNotNull(buffer); + } + + // if requestMemorySegments() blocks, this will make sure that enough buffers are freed + // eventually for it to continue + bufferRecycler = new Thread(() -> { + try { + Thread.sleep(10000); --- End diff -- Waiting 10s here will increase the probability of us reaching the desired (blocking) code but also makes the test wait quite long. How about the following instead? ``` // requestMemorySegments() below will and wait for buffers // this will make sure that enough buffers are freed eventually for it to continue final OneShotLatch isRunning = new OneShotLatch(); bufferRecycler = new Thread(() -> { try { isRunning.trigger(); Thread.sleep(100); } catch (InterruptedException ignored) { } for (Buffer buffer : buffers) { buffer.recycle(); } }); bufferRecycler.start(); // take more buffers than are freely available at the moment via requestMemorySegments() isRunning.await(); memorySegments = networkBufferPool.requestMemorySegments(numBuffers / 2); ``` That makes is more likely than my original variant which was only waiting 100ms but does not increase the test time too much. > Create a fix size (non rebalancing) buffer pool type for the floating buffers > ----------------------------------------------------------------------------- > > Key: FLINK-7378 > URL: https://issues.apache.org/jira/browse/FLINK-7378 > Project: Flink > Issue Type: Sub-task > Components: Core > Reporter: zhijiang > Assignee: zhijiang > Fix For: 1.4.0 > > > Currently the number of network buffers in {{LocalBufferPool}} for > {{SingleInputGate}} is limited by {{a * <number of channels> + b}}, where a > is the number of exclusive buffers for each channel and b is the number of > floating buffers shared by all channels. > Considering the credit-based flow control feature, we want to create a fix > size buffer pool used to manage the floating buffers for {{SingleInputGate}}. > And the exclusive buffers are assigned to {{InputChannel}}s directly. -- This message was sent by Atlassian JIRA (v6.4.14#64029)