zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r376215513
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ##########
 @@ -188,90 +186,18 @@ public void write(ChannelHandlerContext ctx, Object msg, 
ChannelPromise promise)
                }
        }
 
-       /**
-        * Message decoder based on netty's {@link 
LengthFieldBasedFrameDecoder} but avoiding the
-        * additional memory copy inside {@link 
#extractFrame(ChannelHandlerContext, ByteBuf, int, int)}
-        * since we completely decode the {@link ByteBuf} inside {@link 
#decode(ChannelHandlerContext,
-        * ByteBuf)} and will not re-use it afterwards.
-        *
-        * <p>The frame-length encoder will be based on this transmission 
scheme created by {@link NettyMessage#allocateBuffer(ByteBufAllocator, byte, 
int)}:
-        * <pre>
-        * +------------------+------------------+--------++----------------+
-        * | FRAME LENGTH (4) | MAGIC NUMBER (4) | ID (1) || CUSTOM MESSAGE |
-        * +------------------+------------------+--------++----------------+
-        * </pre>
-        */
-       static class NettyMessageDecoder extends LengthFieldBasedFrameDecoder {
-               /**
-                * Creates a new message decoded with the required frame 
properties.
-                */
-               NettyMessageDecoder() {
-                       super(Integer.MAX_VALUE, 0, 4, -4, 4);
-               }
-
-               @Override
-               protected Object decode(ChannelHandlerContext ctx, ByteBuf in) 
throws Exception {
-                       ByteBuf msg = (ByteBuf) super.decode(ctx, in);
-                       if (msg == null) {
-                               return null;
-                       }
-
-                       try {
-                               int magicNumber = msg.readInt();
-
-                               if (magicNumber != MAGIC_NUMBER) {
-                                       throw new IllegalStateException(
-                                               "Network stream corrupted: 
received incorrect magic number.");
-                               }
-
-                               byte msgId = msg.readByte();
-
-                               final NettyMessage decodedMsg;
-                               switch (msgId) {
-                                       case BufferResponse.ID:
-                                               decodedMsg = 
BufferResponse.readFrom(msg);
-                                               break;
-                                       case PartitionRequest.ID:
-                                               decodedMsg = 
PartitionRequest.readFrom(msg);
-                                               break;
-                                       case TaskEventRequest.ID:
-                                               decodedMsg = 
TaskEventRequest.readFrom(msg, getClass().getClassLoader());
-                                               break;
-                                       case ErrorResponse.ID:
-                                               decodedMsg = 
ErrorResponse.readFrom(msg);
-                                               break;
-                                       case CancelPartitionRequest.ID:
-                                               decodedMsg = 
CancelPartitionRequest.readFrom(msg);
-                                               break;
-                                       case CloseRequest.ID:
-                                               decodedMsg = 
CloseRequest.readFrom(msg);
-                                               break;
-                                       case AddCredit.ID:
-                                               decodedMsg = 
AddCredit.readFrom(msg);
-                                               break;
-                                       default:
-                                               throw new ProtocolException(
-                                                       "Received unknown 
message from producer: " + msg);
-                               }
-
-                               return decodedMsg;
-                       } finally {
-                               // ByteToMessageDecoder cleanup (only the 
BufferResponse holds on to the decoded
-                               // msg but already retain()s the buffer once)
-                               msg.release();
-                       }
-               }
-       }
-
        // 
------------------------------------------------------------------------
        // Server responses
        // 
------------------------------------------------------------------------
 
        static class BufferResponse extends NettyMessage {
 
-               private static final byte ID = 0;
+               static final byte ID = 0;
 
-               final ByteBuf buffer;
+               // receiver ID (16), sequence number (4), backlog (4), isBuffer 
(1), isCompressed (1), buffer size (4)
+               static final int MESSAGE_HEADER_LENGTH = 16 + 4 + 4 + 1 + 1 + 4;
+
+               final Buffer buffer;
 
 Review comment:
   This var should be annotated with `Nullable`, but I do not suggest bringing 
nullable var if not very necessary refer to 
https://github.com/apache/flink/pull/7368#pullrequestreview-354930094. 
Otherwise every usage should judge the nullable condition to avoid NPE. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to