lhotari commented on code in PR #22760:
URL: https://github.com/apache/pulsar/pull/22760#discussion_r1610605439


##########
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java:
##########
@@ -122,38 +123,22 @@ public void write(ChannelHandlerContext ctx, Object msg, 
ChannelPromise promise)
                 // ByteBuf are automatically released after a write. If the 
ByteBufPair ref count is increased and it
                 // gets written multiple times, the individual buffers 
refcount should be reflected as well.
                 try {
-                    ctx.write(b.getFirst().retainedDuplicate(), 
ctx.voidPromise());
-                    ctx.write(b.getSecond().retainedDuplicate(), promise);
+                    ctx.write(readOnlyRetainedDuplicate(b.getFirst()), 
ctx.voidPromise());
+                    ctx.write(readOnlyRetainedDuplicate(b.getSecond()), 
promise);
                 } finally {
                     ReferenceCountUtil.safeRelease(b);
                 }
             } else {
                 ctx.write(msg, promise);
             }
         }
-    }
-
-    @Sharable
-    @SuppressWarnings("checkstyle:JavadocType")
-    public static class CopyingEncoder extends ChannelOutboundHandlerAdapter {
-        @Override
-        public void write(ChannelHandlerContext ctx, Object msg, 
ChannelPromise promise) throws Exception {
-            if (msg instanceof ByteBufPair) {
-                ByteBufPair b = (ByteBufPair) msg;
 
-                // Some handlers in the pipeline will modify the bytebufs 
passed in to them (i.e. SslHandler).
-                // For these handlers, we need to pass a copy of the buffers 
as the source buffers may be cached
-                // for multiple requests.
-                try {
-                    ctx.write(b.getFirst().copy(), ctx.voidPromise());
-                    ctx.write(b.getSecond().copy(), promise);
-                } finally {
-                    ReferenceCountUtil.safeRelease(b);
-                }
-            } else {
-                ctx.write(msg, promise);
-            }
+        // .asReadOnly() is needed to prevent SslHandler from modifying the 
input buffers.
+        private static ByteBuf readOnlyRetainedDuplicate(ByteBuf buf) {
+            // If the buffer is already read-only, .asReadOnly() will return 
the same buffer.
+            // That's why the additional .retainedDuplicate() is needed to 
ensure that the returned buffer
+            // has independent readIndex and writeIndex.
+            return buf.asReadOnly().retainedDuplicate();

Review Comment:
   > We should return false immediately if cumulation is not writable
   
   In this case, isWritable returns true for a buffer for a wrapped read only 
buffer. This is surprising and the reason why this read only buffers aren't 
supported if there's another wrapper. It correctly returns true for isReadOnly.
   
   One possible workaround could be to use `Unpooled.unmodifiableBuffer` to add 
the readonly wrapper so that it's always the "top most" wrapper. That would 
also avoid the need for the extra `duplicate()` wrapper.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to