Copilot commented on code in PR #10514:
URL: https://github.com/apache/rocketmq/pull/10514#discussion_r3410846477


##########
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingContext.java:
##########
@@ -20,6 +20,8 @@
 import java.util.List;
 
 public class TopicQueueMappingContext  {
+    public static final TopicQueueMappingContext EMPTY = new 
TopicQueueMappingContext(null, null, null, null, null);
+
     private String topic;
     private Integer globalId;
     private TopicQueueMappingDetail mappingDetail;

Review Comment:
   Introducing a shared `EMPTY` singleton is risky if 
`TopicQueueMappingContext` is mutable (e.g., has setters or exposes mutable 
internals), because any accidental mutation becomes a cross-thread/global side 
effect. Consider making the class immutable (final fields, no setters) or 
replacing `EMPTY` with a factory like `empty()` that returns a new instance (or 
a truly immutable instance) to prevent shared-state bugs.



##########
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java:
##########
@@ -581,13 +581,17 @@ public void 
channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio
             if (channel.isWritable()) {
                 if (!channel.config().isAutoRead()) {
                     channel.config().setAutoRead(true);
-                    log.info("Channel[{}] turns writable, bytes to buffer 
before changing channel to un-writable: {}",
-                        RemotingHelper.parseChannelRemoteAddr(channel), 
channel.bytesBeforeUnwritable());
+                    if (log.isDebugEnabled()) {
+                        log.debug("Channel[{}] turns writable, bytes to buffer 
before changing channel to un-writable: {}",
+                            RemotingHelper.parseChannelRemoteAddr(channel), 
channel.bytesBeforeUnwritable());
+                    }
                 }
             } else {
                 channel.config().setAutoRead(false);
-                log.warn("Channel[{}] auto-read is disabled, bytes to drain 
before it turns writable: {}",
-                    RemotingHelper.parseChannelRemoteAddr(channel), 
channel.bytesBeforeWritable());
+                if (log.isDebugEnabled()) {
+                    log.debug("Channel[{}] auto-read is disabled, bytes to 
drain before it turns writable: {}",
+                        RemotingHelper.parseChannelRemoteAddr(channel), 
channel.bytesBeforeWritable());
+                }

Review Comment:
   This changes the operational signal from `info`/`warn` to `debug` for 
channel writability/backpressure events. If these logs were relied on for 
diagnosing pressure/slow consumers in production, the downgrade may reduce 
observability. Consider keeping at least the auto-read disabled event at `warn` 
(or making it configurable) while still guarding expensive argument evaluation.



##########
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java:
##########
@@ -54,6 +57,12 @@ public class RemotingCommand {
     private static final Map<Class<? extends CommandCustomHeader>, Field[]> 
CLASS_HASH_MAP =
         new HashMap<>();
     private static final Map<Class, String> CANONICAL_NAME_CACHE = new 
HashMap<>();
+    // Caches the no-arg constructor of each CommandCustomHeader class.
+    // Why: Class.getDeclaredConstructor() copies the Constructor object on 
every call
+    // (sample showed ~70MB of Constructor allocations during a 60s benchmark).
+    // The set of header classes is fixed at startup, so 
ConcurrentHashMap.computeIfAbsent
+    // pays the reflective lookup once per class and reuses the cached 
Constructor thereafter.
+    private static final Map<Class<?>, Constructor<?>> HEADER_CTOR_CACHE = new 
ConcurrentHashMap<>();

Review Comment:
   The comment states `ConcurrentHashMap.computeIfAbsent`, but the 
implementation uses a manual `get` + `putIfAbsent`. Either update the comment 
to match the code, or switch to `computeIfAbsent` to reflect the described 
behavior and reduce duplicate reflective lookups under contention.



##########
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java:
##########
@@ -509,6 +529,29 @@ public ByteBuffer encodeHeader(final int bodyLength) {
         return result;
     }
 
+    public ByteBuffer fastEncodeHeaderAsBuffer(final int bodyLength) {
+        ByteBuf buf = Unpooled.buffer(192);
+        int beginIndex = buf.writerIndex();
+        buf.writeLong(0);
+        int headerSize;
+        if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
+            if (customHeader != null && !(customHeader instanceof 
FastCodesHeader)) {
+                this.makeCustomHeaderToNet();
+            }
+            headerSize = RocketMQSerializable.rocketMQProtocolEncode(this, 
buf);
+        } else {
+            this.makeCustomHeaderToNet();
+            byte[] header = RemotingSerializable.encode(this);
+            headerSize = header.length;
+            buf.writeBytes(header);
+        }
+        buf.setInt(beginIndex, 4 + headerSize + bodyLength);
+        buf.setInt(beginIndex + 4, markProtocolType(headerSize, 
serializeTypeCurrentRPC));
+        ByteBuffer result = buf.nioBuffer();
+        buf.release();
+        return result;
+    }

Review Comment:
   `fastEncodeHeaderAsBuffer` returns a `ByteBuffer` view backed by Netty 
`ByteBuf` memory, then immediately calls `buf.release()`. This can return a 
`ByteBuffer` pointing to freed/reused memory, leading to corrupted headers or 
JVM crashes under load (especially for direct buffers / pooled allocators). Fix 
by copying into an independent `ByteBuffer` before releasing, or change the API 
to return a `ByteBuf` (or a retained buffer) so the caller can manage 
ref-counting safely.



##########
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java:
##########
@@ -581,13 +581,17 @@ public void 
channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio
             if (channel.isWritable()) {
                 if (!channel.config().isAutoRead()) {
                     channel.config().setAutoRead(true);
-                    log.info("Channel[{}] turns writable, bytes to buffer 
before changing channel to un-writable: {}",
-                        RemotingHelper.parseChannelRemoteAddr(channel), 
channel.bytesBeforeUnwritable());
+                    if (log.isDebugEnabled()) {
+                        log.debug("Channel[{}] turns writable, bytes to buffer 
before changing channel to un-writable: {}",
+                            RemotingHelper.parseChannelRemoteAddr(channel), 
channel.bytesBeforeUnwritable());
+                    }

Review Comment:
   This changes the operational signal from `info`/`warn` to `debug` for 
channel writability/backpressure events. If these logs were relied on for 
diagnosing pressure/slow consumers in production, the downgrade may reduce 
observability. Consider keeping at least the auto-read disabled event at `warn` 
(or making it configurable) while still guarding expensive argument evaluation.



##########
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java:
##########
@@ -175,6 +183,18 @@ public static RemotingCommand createResponseCommand(int 
code, String remark,
         return cmd;
     }
 
+    @SuppressWarnings("unchecked")
+    private static <T> T newHeaderInstance(Class<T> cls)
+        throws InstantiationException, IllegalAccessException, 
InvocationTargetException, NoSuchMethodException {
+        Constructor<?> ctor = HEADER_CTOR_CACHE.get(cls);
+        if (ctor == null) {
+            ctor = cls.getDeclaredConstructor();
+            ctor.setAccessible(true);
+            HEADER_CTOR_CACHE.putIfAbsent(cls, ctor);
+        }
+        return (T) ctor.newInstance();

Review Comment:
   `ctor.setAccessible(true)` can fail at runtime on newer JDKs / JPMS strong 
encapsulation (e.g., throwing `InaccessibleObjectException`). Since this method 
is invoked on the hot path for decoding/creating commands, a runtime failure 
here could break remoting entirely. Prefer requiring a public no-arg 
constructor and using `getConstructor()` (no `setAccessible`), or catch and 
wrap these reflective-access failures with a clear exception so operators can 
diagnose module-access issues.



##########
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java:
##########
@@ -635,12 +681,12 @@ public void setSerializeTypeCurrentRPC(SerializeType 
serializeTypeCurrentRPC) {
         this.serializeTypeCurrentRPC = serializeTypeCurrentRPC;
     }
 
-    public Stopwatch getProcessTimer() {
-        return processTimer;
+    public long processTimerElapsedMs() {
+        return (System.nanoTime() - processTimerNanos) / 1_000_000;
     }
 
-    public void setProcessTimer(Stopwatch processTimer) {
-        this.processTimer = processTimer;
+    public void setProcessTimerNanos(long nanos) {
+        this.processTimerNanos = nanos;
     }

Review Comment:
   This replaces the previous `getProcessTimer()` / 
`setProcessTimer(Stopwatch)` API with new methods and different semantics 
(timestamp vs Stopwatch). If `RemotingCommand` is part of a public/stable API 
surface, this is a source/binary breaking change for downstream code. Consider 
keeping the old methods as deprecated adapters (internally backed by 
`processTimerNanos`) for at least one release cycle to preserve compatibility.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to