zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r376215573
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ##########
 @@ -283,44 +209,54 @@ protected Object decode(ChannelHandlerContext ctx, 
ByteBuf in) throws Exception
 
                final boolean isCompressed;
 
-               private BufferResponse(
-                               ByteBuf buffer,
-                               boolean isBuffer,
-                               boolean isCompressed,
+               final int bufferSize;
+
+               BufferResponse(
+                               Buffer buffer,
                                int sequenceNumber,
                                InputChannelID receiverId,
                                int backlog) {
-                       this.buffer = checkNotNull(buffer);
-                       this.isBuffer = isBuffer;
-                       this.isCompressed = isCompressed;
-                       this.sequenceNumber = sequenceNumber;
-                       this.receiverId = checkNotNull(receiverId);
-                       this.backlog = backlog;
+
+                       this(checkNotNull(buffer),
+                               buffer.isBuffer(),
+                               buffer.isCompressed(),
+                               sequenceNumber,
+                               receiverId,
+                               backlog,
+                               buffer.readableBytes());
                }
 
-               BufferResponse(
-                               Buffer buffer,
+               private BufferResponse(
+                               @Nullable Buffer buffer,
+                               boolean isBuffer,
+                               boolean isCompressed,
                                int sequenceNumber,
                                InputChannelID receiverId,
-                               int backlog) {
-                       this.buffer = checkNotNull(buffer).asByteBuf();
-                       this.isBuffer = buffer.isBuffer();
-                       this.isCompressed = buffer.isCompressed();
+                               int backlog,
+                               int bufferSize) {
+                       this.buffer = buffer;
+                       this.isBuffer = isBuffer;
+                       this.isCompressed = isCompressed;
                        this.sequenceNumber = sequenceNumber;
                        this.receiverId = checkNotNull(receiverId);
                        this.backlog = backlog;
+                       this.bufferSize = bufferSize;
                }
 
                boolean isBuffer() {
                        return isBuffer;
                }
 
-               ByteBuf getNettyBuffer() {
+               public Buffer getBuffer() {
                        return buffer;
                }
 
+               public int getBufferSize() {
+                       return bufferSize;
+               }
+
                void releaseBuffer() {
-                       buffer.release();
+                       buffer.recycleBuffer();
 
 Review comment:
   Actually this might bring potential NPE issue as I mentioned above.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to