This is an automated email from the ASF dual-hosted git repository.
RongtongJin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 5977e9cb1f [ISSUE #10522] Reduce Remoting header encoding allocation
via FastCodesHeader/RocketMQSerializable optimizations
5977e9cb1f is described below
commit 5977e9cb1f4616c7e308aaeb2d1af2b330385750
Author: Jiahua Wang <[email protected]>
AuthorDate: Fri Jun 19 09:41:19 2026 +0800
[ISSUE #10522] Reduce Remoting header encoding allocation via
FastCodesHeader/RocketMQSerializable optimizations
- Add writeDecimalLong/writeDecimalInt to RocketMQSerializable for direct
primitive decimal encoding
- Add writeLong/writeInt helpers to FastCodesHeader
- Optimize writeIfNotNull to route Long/Integer through primitive writers
- Cache single-byte ASCII strings in RocketMQSerializable.readStr
- Right-size HashMap initial capacity in mapDeserialize (128 -> 24)
Co-authored-by: wangjiahua.wjh <[email protected]>
---
.../remoting/protocol/FastCodesHeader.java | 18 ++++-
.../remoting/protocol/RocketMQSerializable.java | 79 +++++++++++++++++++++-
2 files changed, 95 insertions(+), 2 deletions(-)
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java
index ebf9930889..74cd49cb42 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/FastCodesHeader.java
@@ -38,10 +38,26 @@ public interface FastCodesHeader {
default void writeIfNotNull(ByteBuf out, String key, Object value) {
if (value != null) {
RocketMQSerializable.writeStr(out, true, key);
- RocketMQSerializable.writeStr(out, false, value.toString());
+ if (value instanceof Long) {
+ RocketMQSerializable.writeDecimalLong(out, (Long) value);
+ } else if (value instanceof Integer) {
+ RocketMQSerializable.writeDecimalInt(out, (Integer) value);
+ } else {
+ RocketMQSerializable.writeStr(out, false, value.toString());
+ }
}
}
+ default void writeLong(ByteBuf out, String key, long value) {
+ RocketMQSerializable.writeStr(out, true, key);
+ RocketMQSerializable.writeDecimalLong(out, value);
+ }
+
+ default void writeInt(ByteBuf out, String key, int value) {
+ RocketMQSerializable.writeStr(out, true, key);
+ RocketMQSerializable.writeDecimalInt(out, value);
+ }
+
void encode(ByteBuf out);
void decode(HashMap<String, String> fields) throws
RemotingCommandException;
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java
index 25ebbaafd9..1a89abd990 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java
@@ -28,6 +28,75 @@ import io.netty.buffer.ByteBuf;
public class RocketMQSerializable {
private static final Charset CHARSET_UTF8 = StandardCharsets.UTF_8;
+ private static final String[] SINGLE_BYTE_STRINGS = new String[128];
+ static {
+ for (int i = 0; i < 128; i++) {
+ SINGLE_BYTE_STRINGS[i] = String.valueOf((char) i);
+ }
+ }
+
+ public static void writeDecimalLong(ByteBuf buf, long value) {
+ int lenIndex = buf.writerIndex();
+ buf.writeInt(0);
+ int start = buf.writerIndex();
+ if (value == 0) {
+ buf.writeByte('0');
+ } else {
+ boolean neg = value < 0;
+ if (neg) {
+ buf.writeByte('-');
+ if (value == Long.MIN_VALUE) {
+ writePositiveDigits(buf, 922337203685477580L);
+ buf.writeByte('8');
+ buf.setInt(lenIndex, buf.writerIndex() - start);
+ return;
+ }
+ value = -value;
+ }
+ writePositiveDigits(buf, value);
+ }
+ buf.setInt(lenIndex, buf.writerIndex() - start);
+ }
+
+ public static void writeDecimalInt(ByteBuf buf, int value) {
+ int lenIndex = buf.writerIndex();
+ buf.writeInt(0);
+ int start = buf.writerIndex();
+ if (value == 0) {
+ buf.writeByte('0');
+ } else {
+ boolean neg = value < 0;
+ if (neg) {
+ buf.writeByte('-');
+ if (value == Integer.MIN_VALUE) {
+ writePositiveDigits(buf, 214748364L);
+ buf.writeByte('8');
+ buf.setInt(lenIndex, buf.writerIndex() - start);
+ return;
+ }
+ value = -value;
+ }
+ writePositiveDigits(buf, value);
+ }
+ buf.setInt(lenIndex, buf.writerIndex() - start);
+ }
+
+ private static void writePositiveDigits(ByteBuf buf, long value) {
+ int digitStart = buf.writerIndex();
+ int numDigits = 0;
+ long temp = value;
+ while (temp > 0) {
+ numDigits++;
+ temp /= 10;
+ }
+ buf.ensureWritable(numDigits);
+ buf.writerIndex(digitStart + numDigits);
+ temp = value;
+ for (int i = numDigits - 1; i >= 0; i--) {
+ buf.setByte(digitStart + i, (byte) ('0' + (int) (temp % 10)));
+ temp /= 10;
+ }
+ }
public static void writeStr(ByteBuf buf, boolean useShortLength, String
str) {
int lenIndex = buf.writerIndex();
@@ -52,6 +121,13 @@ public class RocketMQSerializable {
if (len > limit) {
throw new RemotingCommandException("string length exceed limit:" +
limit);
}
+ if (len == 1) {
+ byte b = buf.readByte();
+ if (b >= 0) {
+ return SINGLE_BYTE_STRINGS[b];
+ }
+ return new String(new byte[]{b}, CHARSET_UTF8);
+ }
CharSequence cs = buf.readCharSequence(len, StandardCharsets.UTF_8);
return cs == null ? null : cs.toString();
}
@@ -231,7 +307,8 @@ public class RocketMQSerializable {
public static HashMap<String, String> mapDeserialize(ByteBuf byteBuffer,
int len) throws RemotingCommandException {
- HashMap<String, String> map = new HashMap<>(128);
+ // Typical RocketMQ headers have 5-15 entries; 24 (next power-of-2
above 15*1.33) avoids resize
+ HashMap<String, String> map = new HashMap<>(24);
int endIndex = byteBuffer.readerIndex() + len;
while (byteBuffer.readerIndex() < endIndex) {