zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r389356547
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java ########## @@ -419,6 +458,110 @@ public void testNotifyCreditAvailableAfterReleased() throws Exception { } } + @Test + public void testReadBufferResponseBeforeReleasingChannel() throws Exception { + testReadBufferResponseBeforeReleasingOrRemovingChannel(false); + } + + @Test + public void testReadBufferResponseBeforeRemovingChannel() throws Exception { + testReadBufferResponseBeforeReleasingOrRemovingChannel(true); + } + + @Test + public void testReadBufferResponseAfterReleasingChannel() throws Exception { + testReadBufferResponseAfterReleasingAndRemovingChannel(false); + } + + @Test + public void testReadBufferResponseAfterRemovingChannel() throws Exception { + testReadBufferResponseAfterReleasingAndRemovingChannel(true); + } + + private void testReadBufferResponseBeforeReleasingOrRemovingChannel(boolean isRemoved) throws Exception { + int bufferSize = 1024; + + NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, bufferSize, 2); + SingleInputGate inputGate = createSingleInputGate(1); + RemoteInputChannel inputChannel = new InputChannelBuilder() + .setMemorySegmentProvider(networkBufferPool) + .buildRemoteAndSetToGate(inputGate); + inputGate.assignExclusiveSegments(); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); + EmbeddedChannel embeddedChannel = new EmbeddedChannel(handler); + handler.addInputChannel(inputChannel); + + try { + BufferResponse bufferResponse = createBufferResponse( + TestBufferFactory.createBuffer(bufferSize), + 0, + inputChannel.getInputChannelId(), + 1, + new NetworkBufferAllocator(handler)); + + // Release the channel. + inputGate.close(); + if (isRemoved) { + handler.removeInputChannel(inputChannel); + } + + handler.channelRead(null, bufferResponse); + + assertEquals(0, inputChannel.getNumberOfQueuedBuffers()); + assertNotNull(bufferResponse.getBuffer()); + assertTrue(bufferResponse.getBuffer().isRecycled()); + + embeddedChannel.runScheduledPendingTasks(); + NettyMessage.CancelPartitionRequest cancelPartitionRequest = embeddedChannel.readOutbound(); + assertNotNull(cancelPartitionRequest); + assertEquals(inputChannel.getInputChannelId(), cancelPartitionRequest.receiverId); + } finally { + releaseResource(inputGate, networkBufferPool); + } + } + + private void testReadBufferResponseAfterReleasingAndRemovingChannel(boolean isRemoved) throws Exception { Review comment: It is better to deduplicate this method with above `testReadBufferResponseBeforeReleasingOrRemovingChannel`, because there are almost the same codes except the sequence to trigger release. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services