wang-jiahua commented on code in PR #10514:
URL: https://github.com/apache/rocketmq/pull/10514#discussion_r3410872703


##########
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java:
##########
@@ -509,6 +529,29 @@ public ByteBuffer encodeHeader(final int bodyLength) {
         return result;
     }
 
+    public ByteBuffer fastEncodeHeaderAsBuffer(final int bodyLength) {
+        ByteBuf buf = Unpooled.buffer(192);
+        int beginIndex = buf.writerIndex();
+        buf.writeLong(0);
+        int headerSize;
+        if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
+            if (customHeader != null && !(customHeader instanceof 
FastCodesHeader)) {
+                this.makeCustomHeaderToNet();
+            }
+            headerSize = RocketMQSerializable.rocketMQProtocolEncode(this, 
buf);
+        } else {
+            this.makeCustomHeaderToNet();
+            byte[] header = RemotingSerializable.encode(this);
+            headerSize = header.length;
+            buf.writeBytes(header);
+        }
+        buf.setInt(beginIndex, 4 + headerSize + bodyLength);
+        buf.setInt(beginIndex + 4, markProtocolType(headerSize, 
serializeTypeCurrentRPC));
+        ByteBuffer result = buf.nioBuffer();
+        buf.release();
+        return result;
+    }

Review Comment:
   `Unpooled.buffer(192)` creates a `UnpooledHeapByteBuf` (heap-allocated). Its 
`nioBuffer()` returns `ByteBuffer.wrap(byte[])` — a view backed by the heap 
array, not by Netty reference-counted memory. After `buf.release()`, the heap 
`byte[]` remains valid (GC-managed). This would be a concern only for 
`DirectByteBuf` where release frees native memory, but heap ByteBuf is safe 
here.



##########
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java:
##########
@@ -54,6 +57,12 @@ public class RemotingCommand {
     private static final Map<Class<? extends CommandCustomHeader>, Field[]> 
CLASS_HASH_MAP =
         new HashMap<>();
     private static final Map<Class, String> CANONICAL_NAME_CACHE = new 
HashMap<>();
+    // Caches the no-arg constructor of each CommandCustomHeader class.
+    // Why: Class.getDeclaredConstructor() copies the Constructor object on 
every call
+    // (sample showed ~70MB of Constructor allocations during a 60s benchmark).
+    // The set of header classes is fixed at startup, so 
ConcurrentHashMap.computeIfAbsent
+    // pays the reflective lookup once per class and reuses the cached 
Constructor thereafter.
+    private static final Map<Class<?>, Constructor<?>> HEADER_CTOR_CACHE = new 
ConcurrentHashMap<>();

Review Comment:
   Good catch — the comment is misleading. Will update it to accurately 
describe the `get` + `putIfAbsent` pattern used.



##########
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java:
##########
@@ -635,12 +681,12 @@ public void setSerializeTypeCurrentRPC(SerializeType 
serializeTypeCurrentRPC) {
         this.serializeTypeCurrentRPC = serializeTypeCurrentRPC;
     }
 
-    public Stopwatch getProcessTimer() {
-        return processTimer;
+    public long processTimerElapsedMs() {
+        return (System.nanoTime() - processTimerNanos) / 1_000_000;
     }
 
-    public void setProcessTimer(Stopwatch processTimer) {
-        this.processTimer = processTimer;
+    public void setProcessTimerNanos(long nanos) {
+        this.processTimerNanos = nanos;
     }

Review Comment:
   This API change is intentional — the old 
`getProcessTimer()`/`setProcessTimer(Stopwatch)` methods allocated a Guava 
`Stopwatch` per RPC. The new 
`setProcessTimerNanos(long)`/`processTimerElapsedMs()` uses primitive `long` 
with zero allocation. The only caller is `NettyDecoder` (updated in this same 
PR) and `NettyRemotingAbstract` (which calls `processTimerElapsedMs()`). No 
external callers depend on the old Stopwatch API.



##########
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java:
##########
@@ -175,6 +183,18 @@ public static RemotingCommand createResponseCommand(int 
code, String remark,
         return cmd;
     }
 
+    @SuppressWarnings("unchecked")
+    private static <T> T newHeaderInstance(Class<T> cls)
+        throws InstantiationException, IllegalAccessException, 
InvocationTargetException, NoSuchMethodException {
+        Constructor<?> ctor = HEADER_CTOR_CACHE.get(cls);
+        if (ctor == null) {
+            ctor = cls.getDeclaredConstructor();
+            ctor.setAccessible(true);
+            HEADER_CTOR_CACHE.putIfAbsent(cls, ctor);
+        }
+        return (T) ctor.newInstance();

Review Comment:
   The cached constructors are for `CommandCustomHeader` subclasses — these are 
user-defined classes in `org.apache.rocketmq.remoting.protocol.header.*`, not 
JDK internal modules. `setAccessible(true)` on user-code classes does not 
trigger JPMS restrictions. RocketMQ already uses this pattern extensively in 
`decodeCommandCustomHeader()` (the existing code calls 
`getDeclaredConstructor().newInstance()` without JPMS issues).



##########
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java:
##########
@@ -581,13 +581,17 @@ public void 
channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio
             if (channel.isWritable()) {
                 if (!channel.config().isAutoRead()) {
                     channel.config().setAutoRead(true);
-                    log.info("Channel[{}] turns writable, bytes to buffer 
before changing channel to un-writable: {}",
-                        RemotingHelper.parseChannelRemoteAddr(channel), 
channel.bytesBeforeUnwritable());
+                    if (log.isDebugEnabled()) {
+                        log.debug("Channel[{}] turns writable, bytes to buffer 
before changing channel to un-writable: {}",
+                            RemotingHelper.parseChannelRemoteAddr(channel), 
channel.bytesBeforeUnwritable());
+                    }

Review Comment:
   This is an intentional tradeoff. Under high-throughput load, 
`channelWritabilityChanged` fires ~900 times/sec, each triggering 
`parseChannelRemoteAddr()` + String concatenation + AsyncAppender enqueue. This 
was measured as a significant per-RPC allocation source. DEBUG level with 
`isDebugEnabled()` guard preserves the information for troubleshooting while 
eliminating the hot-path cost. Same change was made in PR #10491.



##########
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java:
##########
@@ -581,13 +581,17 @@ public void 
channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio
             if (channel.isWritable()) {
                 if (!channel.config().isAutoRead()) {
                     channel.config().setAutoRead(true);
-                    log.info("Channel[{}] turns writable, bytes to buffer 
before changing channel to un-writable: {}",
-                        RemotingHelper.parseChannelRemoteAddr(channel), 
channel.bytesBeforeUnwritable());
+                    if (log.isDebugEnabled()) {
+                        log.debug("Channel[{}] turns writable, bytes to buffer 
before changing channel to un-writable: {}",
+                            RemotingHelper.parseChannelRemoteAddr(channel), 
channel.bytesBeforeUnwritable());
+                    }
                 }
             } else {
                 channel.config().setAutoRead(false);
-                log.warn("Channel[{}] auto-read is disabled, bytes to drain 
before it turns writable: {}",
-                    RemotingHelper.parseChannelRemoteAddr(channel), 
channel.bytesBeforeWritable());
+                if (log.isDebugEnabled()) {
+                    log.debug("Channel[{}] auto-read is disabled, bytes to 
drain before it turns writable: {}",
+                        RemotingHelper.parseChannelRemoteAddr(channel), 
channel.bytesBeforeWritable());
+                }

Review Comment:
   Same as above — intentional log level downgrade to eliminate hot-path 
allocation. See reply on line 587.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to