Copilot commented on code in PR #10514:
URL: https://github.com/apache/rocketmq/pull/10514#discussion_r3410846477
##########
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingContext.java:
##########
@@ -20,6 +20,8 @@
import java.util.List;
public class TopicQueueMappingContext {
+ public static final TopicQueueMappingContext EMPTY = new
TopicQueueMappingContext(null, null, null, null, null);
+
private String topic;
private Integer globalId;
private TopicQueueMappingDetail mappingDetail;
Review Comment:
Introducing a shared `EMPTY` singleton is risky if
`TopicQueueMappingContext` is mutable (e.g., has setters or exposes mutable
internals), because any accidental mutation becomes a cross-thread/global side
effect. Consider making the class immutable (final fields, no setters) or
replacing `EMPTY` with a factory like `empty()` that returns a new instance (or
a truly immutable instance) to prevent shared-state bugs.
##########
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java:
##########
@@ -581,13 +581,17 @@ public void
channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio
if (channel.isWritable()) {
if (!channel.config().isAutoRead()) {
channel.config().setAutoRead(true);
- log.info("Channel[{}] turns writable, bytes to buffer
before changing channel to un-writable: {}",
- RemotingHelper.parseChannelRemoteAddr(channel),
channel.bytesBeforeUnwritable());
+ if (log.isDebugEnabled()) {
+ log.debug("Channel[{}] turns writable, bytes to buffer
before changing channel to un-writable: {}",
+ RemotingHelper.parseChannelRemoteAddr(channel),
channel.bytesBeforeUnwritable());
+ }
}
} else {
channel.config().setAutoRead(false);
- log.warn("Channel[{}] auto-read is disabled, bytes to drain
before it turns writable: {}",
- RemotingHelper.parseChannelRemoteAddr(channel),
channel.bytesBeforeWritable());
+ if (log.isDebugEnabled()) {
+ log.debug("Channel[{}] auto-read is disabled, bytes to
drain before it turns writable: {}",
+ RemotingHelper.parseChannelRemoteAddr(channel),
channel.bytesBeforeWritable());
+ }
Review Comment:
This changes the operational signal from `info`/`warn` to `debug` for
channel writability/backpressure events. If these logs were relied on for
diagnosing pressure/slow consumers in production, the downgrade may reduce
observability. Consider keeping at least the auto-read disabled event at `warn`
(or making it configurable) while still guarding expensive argument evaluation.
##########
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java:
##########
@@ -54,6 +57,12 @@ public class RemotingCommand {
private static final Map<Class<? extends CommandCustomHeader>, Field[]>
CLASS_HASH_MAP =
new HashMap<>();
private static final Map<Class, String> CANONICAL_NAME_CACHE = new
HashMap<>();
+ // Caches the no-arg constructor of each CommandCustomHeader class.
+ // Why: Class.getDeclaredConstructor() copies the Constructor object on
every call
+ // (sample showed ~70MB of Constructor allocations during a 60s benchmark).
+ // The set of header classes is fixed at startup, so
ConcurrentHashMap.computeIfAbsent
+ // pays the reflective lookup once per class and reuses the cached
Constructor thereafter.
+ private static final Map<Class<?>, Constructor<?>> HEADER_CTOR_CACHE = new
ConcurrentHashMap<>();
Review Comment:
The comment states `ConcurrentHashMap.computeIfAbsent`, but the
implementation uses a manual `get` + `putIfAbsent`. Either update the comment
to match the code, or switch to `computeIfAbsent` to reflect the described
behavior and reduce duplicate reflective lookups under contention.
##########
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java:
##########
@@ -509,6 +529,29 @@ public ByteBuffer encodeHeader(final int bodyLength) {
return result;
}
+ public ByteBuffer fastEncodeHeaderAsBuffer(final int bodyLength) {
+ ByteBuf buf = Unpooled.buffer(192);
+ int beginIndex = buf.writerIndex();
+ buf.writeLong(0);
+ int headerSize;
+ if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
+ if (customHeader != null && !(customHeader instanceof
FastCodesHeader)) {
+ this.makeCustomHeaderToNet();
+ }
+ headerSize = RocketMQSerializable.rocketMQProtocolEncode(this,
buf);
+ } else {
+ this.makeCustomHeaderToNet();
+ byte[] header = RemotingSerializable.encode(this);
+ headerSize = header.length;
+ buf.writeBytes(header);
+ }
+ buf.setInt(beginIndex, 4 + headerSize + bodyLength);
+ buf.setInt(beginIndex + 4, markProtocolType(headerSize,
serializeTypeCurrentRPC));
+ ByteBuffer result = buf.nioBuffer();
+ buf.release();
+ return result;
+ }
Review Comment:
`fastEncodeHeaderAsBuffer` returns a `ByteBuffer` view backed by Netty
`ByteBuf` memory, then immediately calls `buf.release()`. This can return a
`ByteBuffer` pointing to freed/reused memory, leading to corrupted headers or
JVM crashes under load (especially for direct buffers / pooled allocators). Fix
by copying into an independent `ByteBuffer` before releasing, or change the API
to return a `ByteBuf` (or a retained buffer) so the caller can manage
ref-counting safely.
##########
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java:
##########
@@ -581,13 +581,17 @@ public void
channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio
if (channel.isWritable()) {
if (!channel.config().isAutoRead()) {
channel.config().setAutoRead(true);
- log.info("Channel[{}] turns writable, bytes to buffer
before changing channel to un-writable: {}",
- RemotingHelper.parseChannelRemoteAddr(channel),
channel.bytesBeforeUnwritable());
+ if (log.isDebugEnabled()) {
+ log.debug("Channel[{}] turns writable, bytes to buffer
before changing channel to un-writable: {}",
+ RemotingHelper.parseChannelRemoteAddr(channel),
channel.bytesBeforeUnwritable());
+ }
Review Comment:
This changes the operational signal from `info`/`warn` to `debug` for
channel writability/backpressure events. If these logs were relied on for
diagnosing pressure/slow consumers in production, the downgrade may reduce
observability. Consider keeping at least the auto-read disabled event at `warn`
(or making it configurable) while still guarding expensive argument evaluation.
##########
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java:
##########
@@ -175,6 +183,18 @@ public static RemotingCommand createResponseCommand(int
code, String remark,
return cmd;
}
+ @SuppressWarnings("unchecked")
+ private static <T> T newHeaderInstance(Class<T> cls)
+ throws InstantiationException, IllegalAccessException,
InvocationTargetException, NoSuchMethodException {
+ Constructor<?> ctor = HEADER_CTOR_CACHE.get(cls);
+ if (ctor == null) {
+ ctor = cls.getDeclaredConstructor();
+ ctor.setAccessible(true);
+ HEADER_CTOR_CACHE.putIfAbsent(cls, ctor);
+ }
+ return (T) ctor.newInstance();
Review Comment:
`ctor.setAccessible(true)` can fail at runtime on newer JDKs / JPMS strong
encapsulation (e.g., throwing `InaccessibleObjectException`). Since this method
is invoked on the hot path for decoding/creating commands, a runtime failure
here could break remoting entirely. Prefer requiring a public no-arg
constructor and using `getConstructor()` (no `setAccessible`), or catch and
wrap these reflective-access failures with a clear exception so operators can
diagnose module-access issues.
##########
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java:
##########
@@ -635,12 +681,12 @@ public void setSerializeTypeCurrentRPC(SerializeType
serializeTypeCurrentRPC) {
this.serializeTypeCurrentRPC = serializeTypeCurrentRPC;
}
- public Stopwatch getProcessTimer() {
- return processTimer;
+ public long processTimerElapsedMs() {
+ return (System.nanoTime() - processTimerNanos) / 1_000_000;
}
- public void setProcessTimer(Stopwatch processTimer) {
- this.processTimer = processTimer;
+ public void setProcessTimerNanos(long nanos) {
+ this.processTimerNanos = nanos;
}
Review Comment:
This replaces the previous `getProcessTimer()` /
`setProcessTimer(Stopwatch)` API with new methods and different semantics
(timestamp vs Stopwatch). If `RemotingCommand` is part of a public/stable API
surface, this is a source/binary breaking change for downstream code. Consider
keeping the old methods as deprecated adapters (internally backed by
`processTimerNanos`) for at least one release cycle to preserve compatibility.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]