Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5558#discussion_r170028724 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java --- @@ -396,32 +399,49 @@ public void testAvailableBuffersLessThanRequiredBuffers() throws Exception { 18, inputChannel.getNumberOfRequiredBuffers()); assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); + assertTrue(inputChannel.isWaitingForFloatingBuffers()); // Decrease the backlog - inputChannel.onSenderBacklog(15); + inputChannel.onSenderBacklog(13); // Only the number of required buffers is changed by (backlog + numExclusiveBuffers) verify(bufferPool, times(15)).requestBuffer(); verify(bufferPool, times(1)).addBufferListener(inputChannel); assertEquals("There should be 15 buffers available in the channel", 15, inputChannel.getNumberOfAvailableBuffers()); - assertEquals("There should be 17 buffers required in the channel", - 17, inputChannel.getNumberOfRequiredBuffers()); + assertEquals("There should be 15 buffers required in the channel", + 15, inputChannel.getNumberOfRequiredBuffers()); assertEquals("There should be 0 buffer available in local pool", 0, bufferPool.getNumberOfAvailableMemorySegments()); + assertTrue(inputChannel.isWaitingForFloatingBuffers()); - // Recycle one exclusive buffer - exclusiveBuffer.recycleBuffer(); + // Recycle one more floating buffer + floatingBufferQueue.poll().recycleBuffer(); - // The exclusive buffer is returned to the channel directly + // Return the floating buffer to the buffer pool and the channel is not waiting for more floating buffers verify(bufferPool, times(15)).requestBuffer(); verify(bufferPool, times(1)).addBufferListener(inputChannel); + assertEquals("There should be 15 buffers available in the channel", + 15, inputChannel.getNumberOfAvailableBuffers()); + assertEquals("There should be 15 buffers required in the channel", + 15, inputChannel.getNumberOfRequiredBuffers()); + assertEquals("There should be 1 buffers available in local pool", + 1, bufferPool.getNumberOfAvailableMemorySegments()); + assertFalse(inputChannel.isWaitingForFloatingBuffers()); + + // Increase the backlog again + inputChannel.onSenderBacklog(15); + + // The floating buffer is requested from the buffer pool and the channel is registered as listener again. + verify(bufferPool, times(17)).requestBuffer(); + verify(bufferPool, times(2)).addBufferListener(inputChannel); assertEquals("There should be 16 buffers available in the channel", 16, inputChannel.getNumberOfAvailableBuffers()); assertEquals("There should be 17 buffers required in the channel", 17, inputChannel.getNumberOfRequiredBuffers()); - assertEquals("There should be 0 buffers available in local pool", + assertEquals("There should be 0 buffer available in local pool", --- End diff -- `buffers` is correct here
---