wang-jiahua commented on code in PR #10514:
URL: https://github.com/apache/rocketmq/pull/10514#discussion_r3410872703
##########
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java:
##########
@@ -509,6 +529,29 @@ public ByteBuffer encodeHeader(final int bodyLength) {
return result;
}
+ public ByteBuffer fastEncodeHeaderAsBuffer(final int bodyLength) {
+ ByteBuf buf = Unpooled.buffer(192);
+ int beginIndex = buf.writerIndex();
+ buf.writeLong(0);
+ int headerSize;
+ if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
+ if (customHeader != null && !(customHeader instanceof
FastCodesHeader)) {
+ this.makeCustomHeaderToNet();
+ }
+ headerSize = RocketMQSerializable.rocketMQProtocolEncode(this,
buf);
+ } else {
+ this.makeCustomHeaderToNet();
+ byte[] header = RemotingSerializable.encode(this);
+ headerSize = header.length;
+ buf.writeBytes(header);
+ }
+ buf.setInt(beginIndex, 4 + headerSize + bodyLength);
+ buf.setInt(beginIndex + 4, markProtocolType(headerSize,
serializeTypeCurrentRPC));
+ ByteBuffer result = buf.nioBuffer();
+ buf.release();
+ return result;
+ }
Review Comment:
`Unpooled.buffer(192)` creates a `UnpooledHeapByteBuf` (heap-allocated). Its
`nioBuffer()` returns `ByteBuffer.wrap(byte[])` — a view backed by the heap
array, not by Netty reference-counted memory. After `buf.release()`, the heap
`byte[]` remains valid (GC-managed). This would be a concern only for
`DirectByteBuf` where release frees native memory, but heap ByteBuf is safe
here.
##########
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java:
##########
@@ -54,6 +57,12 @@ public class RemotingCommand {
private static final Map<Class<? extends CommandCustomHeader>, Field[]>
CLASS_HASH_MAP =
new HashMap<>();
private static final Map<Class, String> CANONICAL_NAME_CACHE = new
HashMap<>();
+ // Caches the no-arg constructor of each CommandCustomHeader class.
+ // Why: Class.getDeclaredConstructor() copies the Constructor object on
every call
+ // (sample showed ~70MB of Constructor allocations during a 60s benchmark).
+ // The set of header classes is fixed at startup, so
ConcurrentHashMap.computeIfAbsent
+ // pays the reflective lookup once per class and reuses the cached
Constructor thereafter.
+ private static final Map<Class<?>, Constructor<?>> HEADER_CTOR_CACHE = new
ConcurrentHashMap<>();
Review Comment:
Good catch — the comment is misleading. Will update it to accurately
describe the `get` + `putIfAbsent` pattern used.
##########
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java:
##########
@@ -635,12 +681,12 @@ public void setSerializeTypeCurrentRPC(SerializeType
serializeTypeCurrentRPC) {
this.serializeTypeCurrentRPC = serializeTypeCurrentRPC;
}
- public Stopwatch getProcessTimer() {
- return processTimer;
+ public long processTimerElapsedMs() {
+ return (System.nanoTime() - processTimerNanos) / 1_000_000;
}
- public void setProcessTimer(Stopwatch processTimer) {
- this.processTimer = processTimer;
+ public void setProcessTimerNanos(long nanos) {
+ this.processTimerNanos = nanos;
}
Review Comment:
This API change is intentional — the old
`getProcessTimer()`/`setProcessTimer(Stopwatch)` methods allocated a Guava
`Stopwatch` per RPC. The new
`setProcessTimerNanos(long)`/`processTimerElapsedMs()` uses primitive `long`
with zero allocation. The only caller is `NettyDecoder` (updated in this same
PR) and `NettyRemotingAbstract` (which calls `processTimerElapsedMs()`). No
external callers depend on the old Stopwatch API.
##########
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java:
##########
@@ -175,6 +183,18 @@ public static RemotingCommand createResponseCommand(int
code, String remark,
return cmd;
}
+ @SuppressWarnings("unchecked")
+ private static <T> T newHeaderInstance(Class<T> cls)
+ throws InstantiationException, IllegalAccessException,
InvocationTargetException, NoSuchMethodException {
+ Constructor<?> ctor = HEADER_CTOR_CACHE.get(cls);
+ if (ctor == null) {
+ ctor = cls.getDeclaredConstructor();
+ ctor.setAccessible(true);
+ HEADER_CTOR_CACHE.putIfAbsent(cls, ctor);
+ }
+ return (T) ctor.newInstance();
Review Comment:
The cached constructors are for `CommandCustomHeader` subclasses — these are
user-defined classes in `org.apache.rocketmq.remoting.protocol.header.*`, not
JDK internal modules. `setAccessible(true)` on user-code classes does not
trigger JPMS restrictions. RocketMQ already uses this pattern extensively in
`decodeCommandCustomHeader()` (the existing code calls
`getDeclaredConstructor().newInstance()` without JPMS issues).
##########
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java:
##########
@@ -581,13 +581,17 @@ public void
channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio
if (channel.isWritable()) {
if (!channel.config().isAutoRead()) {
channel.config().setAutoRead(true);
- log.info("Channel[{}] turns writable, bytes to buffer
before changing channel to un-writable: {}",
- RemotingHelper.parseChannelRemoteAddr(channel),
channel.bytesBeforeUnwritable());
+ if (log.isDebugEnabled()) {
+ log.debug("Channel[{}] turns writable, bytes to buffer
before changing channel to un-writable: {}",
+ RemotingHelper.parseChannelRemoteAddr(channel),
channel.bytesBeforeUnwritable());
+ }
Review Comment:
This is an intentional tradeoff. Under high-throughput load,
`channelWritabilityChanged` fires ~900 times/sec, each triggering
`parseChannelRemoteAddr()` + String concatenation + AsyncAppender enqueue. This
was measured as a significant per-RPC allocation source. DEBUG level with
`isDebugEnabled()` guard preserves the information for troubleshooting while
eliminating the hot-path cost. Same change was made in PR #10491.
##########
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java:
##########
@@ -581,13 +581,17 @@ public void
channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio
if (channel.isWritable()) {
if (!channel.config().isAutoRead()) {
channel.config().setAutoRead(true);
- log.info("Channel[{}] turns writable, bytes to buffer
before changing channel to un-writable: {}",
- RemotingHelper.parseChannelRemoteAddr(channel),
channel.bytesBeforeUnwritable());
+ if (log.isDebugEnabled()) {
+ log.debug("Channel[{}] turns writable, bytes to buffer
before changing channel to un-writable: {}",
+ RemotingHelper.parseChannelRemoteAddr(channel),
channel.bytesBeforeUnwritable());
+ }
}
} else {
channel.config().setAutoRead(false);
- log.warn("Channel[{}] auto-read is disabled, bytes to drain
before it turns writable: {}",
- RemotingHelper.parseChannelRemoteAddr(channel),
channel.bytesBeforeWritable());
+ if (log.isDebugEnabled()) {
+ log.debug("Channel[{}] auto-read is disabled, bytes to
drain before it turns writable: {}",
+ RemotingHelper.parseChannelRemoteAddr(channel),
channel.bytesBeforeWritable());
+ }
Review Comment:
Same as above — intentional log level downgrade to eliminate hot-path
allocation. See reply on line 587.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]