codelipenghui commented on code in PR #21684:
URL: https://github.com/apache/pulsar/pull/21684#discussion_r1418819587


##########
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/OptionalProxyProtocolDecoder.java:
##########
@@ -19,36 +19,61 @@
 package org.apache.pulsar.common.protocol;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.handler.codec.ProtocolDetectionResult;
 import io.netty.handler.codec.ProtocolDetectionState;
 import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
 import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
+import lombok.extern.slf4j.Slf4j;
 
 /**
  * Decoder that added whether a new connection is prefixed with the 
ProxyProtocol.
  * More about the ProxyProtocol see: 
http://www.haproxy.org/download/1.8/doc/proxy-protocol.txt.
  */
+@Slf4j
 public class OptionalProxyProtocolDecoder extends ChannelInboundHandlerAdapter 
{
 
     public static final String NAME = "optional-proxy-protocol-decoder";
 
+    private CompositeByteBuf cumulatedByteBuf;
+
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
         if (msg instanceof ByteBuf) {
-            ProtocolDetectionResult<HAProxyProtocolVersion> result =
-                    HAProxyMessageDecoder.detectProtocol((ByteBuf) msg);
-            // should accumulate data if need more data to detect the protocol
+            // Combine cumulated buffers.
+            ByteBuf buf = (ByteBuf) msg;
+            if (cumulatedByteBuf != null) {
+                buf = cumulatedByteBuf.addComponent(true, buf);
+            }
+
+            ProtocolDetectionResult<HAProxyProtocolVersion> result = 
HAProxyMessageDecoder.detectProtocol(buf);
             if (result.state() == ProtocolDetectionState.NEEDS_MORE_DATA) {
+                // Accumulate data if need more data to detect the protocol.
+                if (cumulatedByteBuf == null) {
+                    cumulatedByteBuf = new CompositeByteBuf(ctx.alloc(), 
false, 12, buf);

Review Comment:
   It's better to avoid using a magic number here. Create a constant or maybe 
get from HaProxy classes?



##########
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/OptionalProxyProtocolDecoder.java:
##########
@@ -19,36 +19,61 @@
 package org.apache.pulsar.common.protocol;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.handler.codec.ProtocolDetectionResult;
 import io.netty.handler.codec.ProtocolDetectionState;
 import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
 import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
+import lombok.extern.slf4j.Slf4j;
 
 /**
  * Decoder that added whether a new connection is prefixed with the 
ProxyProtocol.
  * More about the ProxyProtocol see: 
http://www.haproxy.org/download/1.8/doc/proxy-protocol.txt.
  */
+@Slf4j
 public class OptionalProxyProtocolDecoder extends ChannelInboundHandlerAdapter 
{
 
     public static final String NAME = "optional-proxy-protocol-decoder";
 
+    private CompositeByteBuf cumulatedByteBuf;
+
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
         if (msg instanceof ByteBuf) {
-            ProtocolDetectionResult<HAProxyProtocolVersion> result =
-                    HAProxyMessageDecoder.detectProtocol((ByteBuf) msg);
-            // should accumulate data if need more data to detect the protocol
+            // Combine cumulated buffers.
+            ByteBuf buf = (ByteBuf) msg;
+            if (cumulatedByteBuf != null) {
+                buf = cumulatedByteBuf.addComponent(true, buf);
+            }
+
+            ProtocolDetectionResult<HAProxyProtocolVersion> result = 
HAProxyMessageDecoder.detectProtocol(buf);
             if (result.state() == ProtocolDetectionState.NEEDS_MORE_DATA) {
+                // Accumulate data if need more data to detect the protocol.
+                if (cumulatedByteBuf == null) {
+                    cumulatedByteBuf = new CompositeByteBuf(ctx.alloc(), 
false, 12, buf);
+                }
                 return;
             }
+            cumulatedByteBuf = null;
 
             if (result.state() == ProtocolDetectionState.DETECTED) {
                 ctx.pipeline().addAfter(NAME, null, new 
HAProxyMessageDecoder());
                 ctx.pipeline().remove(this);
             }
+            super.channelRead(ctx, buf);
+        } else {
+            super.channelRead(ctx, msg);
+        }
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        if (cumulatedByteBuf != null) {
+            log.info("Release cumulated byte buffer when channel inactive.");
+            cumulatedByteBuf = null;
         }
-        super.channelRead(ctx, msg);
+        ctx.fireChannelInactive();

Review Comment:
   It should be super.channelInactive()?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to