gaoyunhaii commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r386120605
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
 ##########
 @@ -430,23 +431,35 @@ private static void releaseResource(SingleInputGate 
inputGate, NetworkBufferPool
        /**
         * Returns a deserialized buffer message as it would be received during 
runtime.
         */
-       private static BufferResponse createBufferResponse(
+       private BufferResponse createBufferResponse(
                        Buffer buffer,
                        int sequenceNumber,
-                       InputChannelID receivingChannelId,
-                       int backlog) throws IOException {
+                       RemoteInputChannel receivingChannel,
+                       int backlog,
+                       CreditBasedPartitionRequestClientHandler clientHandler) 
throws IOException {
+
                // Mock buffer to serialize
-               BufferResponse resp = new BufferResponse(buffer, 
sequenceNumber, receivingChannelId, backlog);
+               BufferResponse resp = new BufferResponse(
+                               buffer,
+                               sequenceNumber,
+                               receivingChannel.getInputChannelId(),
+                               backlog);
 
                ByteBuf serialized = 
resp.write(UnpooledByteBufAllocator.DEFAULT);
 
                // Skip general header bytes
                serialized.readBytes(NettyMessage.FRAME_HEADER_LENGTH);
 
+
                // Deserialize the bytes again. We have to go this way, because 
we only partly deserialize
                // the header of the response and wait for a buffer from the 
buffer pool to copy the payload
                // data into.
-               BufferResponse deserialized = 
BufferResponse.readFrom(serialized);
+               NetworkBufferAllocator allocator = new 
NetworkBufferAllocator(clientHandler);
+               BufferResponse deserialized = 
BufferResponse.readFrom(serialized, allocator);
+
+               if (deserialized.getBuffer() != null) {
 
 Review comment:
   Currently the content is not used, however, I think we might keep the 
copying to keep consistent with previous behaviors, and keeping the 
deserialized buffer the same to the given buffer seems to be more consistent 
with the intuition when viewing this method.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to