Copilot commented on code in PR #10528:
URL: https://github.com/apache/rocketmq/pull/10528#discussion_r3429301864


##########
store/src/main/java/org/apache/rocketmq/store/index/IndexService.java:
##########
@@ -49,6 +49,7 @@ public class IndexService implements CommitLogDispatchStore {
     private final String storePath;
     private final ArrayList<IndexFile> indexFileList = new ArrayList<>();
     private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    private final StringBuilder reusableKeyBuilder = new StringBuilder(128);

Review Comment:
   Using a single instance `StringBuilder` (`reusableKeyBuilder`) makes 
`buildKey(...)` not thread-safe. If `IndexService` methods are called 
concurrently, keys can be corrupted due to races on the shared builder. Prefer 
a local `StringBuilder`, a `ThreadLocal<StringBuilder>`, or guard access (e.g., 
synchronize) around builder mutation.



##########
store/src/main/java/org/apache/rocketmq/store/index/IndexService.java:
##########
@@ -215,10 +216,12 @@ public QueryOffsetResult queryOffset(String topic, String 
key, int maxNum, long
     }
 
     private String buildKey(final String topic, final String key) {
-        return topic + "#" + key;
+        reusableKeyBuilder.setLength(0);
+        return 
reusableKeyBuilder.append(topic).append('#').append(key).toString();
     }
     private String buildKey(final String topic, final String key, final String 
indexType) {
-        return topic + "#" + indexType + "#" + key;
+        reusableKeyBuilder.setLength(0);
+        return 
reusableKeyBuilder.append(topic).append('#').append(indexType).append('#').append(key).toString();

Review Comment:
   Using a single instance `StringBuilder` (`reusableKeyBuilder`) makes 
`buildKey(...)` not thread-safe. If `IndexService` methods are called 
concurrently, keys can be corrupted due to races on the shared builder. Prefer 
a local `StringBuilder`, a `ThreadLocal<StringBuilder>`, or guard access (e.g., 
synchronize) around builder mutation.



##########
store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java:
##########
@@ -464,47 +470,92 @@ public void incGroupAckNums(final String group, final 
String topic, final int in
     }
 
     public String buildStatsKey(String topic, String group) {
-        StringBuilder strBuilder;
         if (topic != null && group != null) {
-            strBuilder = new StringBuilder(topic.length() + group.length() + 
1);
-        } else {
-            strBuilder = new StringBuilder();
+            ConcurrentHashMap<String, String> groupMap = 
statsKeyByGroupCache.get(topic);
+            if (groupMap != null) {
+                String cached = groupMap.get(group);
+                if (cached != null) {
+                    return cached;
+                }
+            }
+            String key = topic + "@" + group;
+            statsKeyByGroupCache.computeIfAbsent(topic, t -> new 
ConcurrentHashMap<>()).put(group, key);
+            return key;
         }
-        strBuilder.append(topic).append("@").append(group);
-        return strBuilder.toString();
+        return topic + "@" + group;
     }
 
     public String buildStatsKey(String topic, int queueId) {
-        StringBuilder strBuilder;
-        if (topic != null) {
-            strBuilder = new StringBuilder(topic.length() + 5);
-        } else {
-            strBuilder = new StringBuilder();
+        String[] keys = statsKeyByQueueCache.get(topic);
+        if (keys != null && queueId >= 0 && queueId < keys.length) {
+            String cached = keys[queueId];
+            if (cached != null) {
+                return cached;
+            }
         }
-        strBuilder.append(topic).append("@").append(queueId);
-        return strBuilder.toString();
+        String key = topic + "@" + queueId;

Review Comment:
   `ConcurrentHashMap` does not permit `null` keys. `buildStatsKey(String, 
int)` and `getLatencyKey(...)` now call `...Cache.get(topic)` without guarding 
`topic != null`, which can throw `NullPointerException` where the previous 
implementation tolerated `topic == null`. Add an early return for `topic == 
null` before accessing the cache (or skip caching in that case).



##########
store/src/main/java/org/apache/rocketmq/store/timer/TimerWheel.java:
##########
@@ -301,6 +310,7 @@ public void putSlot(long timeMs, long firstPos, long 
lastPos, int num, int magic
         localBuffer.get().putLong(lastPos);
         localBuffer.get().putInt(num);
         localBuffer.get().putInt(magic);
+        dirty = true;

Review Comment:
   The `dirty` flag can lose updates under concurrency: during `flush()`, 
another thread can set `dirty = true` after the `if (!dirty)` check but before 
`dirty = false` at the end, causing the write to be missed and the wheel state 
not persisted. Use an atomic state transition (e.g., 
`AtomicBoolean.compareAndSet(true, false)` at flush start) or 
synchronize/serialize modifications and flush so `dirty` cannot be cleared 
while writes are in-flight.



##########
store/src/main/java/org/apache/rocketmq/store/timer/TimerWheel.java:
##########
@@ -313,11 +323,13 @@ public void reviseSlot(long timeMs, long firstPos, long 
lastPos, boolean force)
         } else {
             if (IGNORE != firstPos) {
                 localBuffer.get().putLong(firstPos);
+                dirty = true;

Review Comment:
   The `dirty` flag can lose updates under concurrency: during `flush()`, 
another thread can set `dirty = true` after the `if (!dirty)` check but before 
`dirty = false` at the end, causing the write to be missed and the wheel state 
not persisted. Use an atomic state transition (e.g., 
`AtomicBoolean.compareAndSet(true, false)` at flush start) or 
synchronize/serialize modifications and flush so `dirty` cannot be cleared 
while writes are in-flight.



##########
store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java:
##########
@@ -380,32 +386,32 @@ public void onGroupDeleted(final String group) {
         this.momentStatsItemSetFallTime.delValueBySuffixKey(group, "@");
     }
 
-    public void incQueuePutNums(final String topic, final Integer queueId) {
+    public void incQueuePutNums(final String topic, final int queueId) {
         if (enableQueueStat) {
             
this.statsTable.get(Stats.QUEUE_PUT_NUMS).addValue(buildStatsKey(topic, 
queueId), 1, 1);
         }
     }
 
-    public void incQueuePutNums(final String topic, final Integer queueId, int 
num, int times) {
+    public void incQueuePutNums(final String topic, final int queueId, int 
num, int times) {
         if (enableQueueStat) {
             
this.statsTable.get(Stats.QUEUE_PUT_NUMS).addValue(buildStatsKey(topic, 
queueId), num, times);
         }
     }
 
-    public void incQueuePutSize(final String topic, final Integer queueId, 
final int size) {
+    public void incQueuePutSize(final String topic, final int queueId, final 
int size) {
         if (enableQueueStat) {
             
this.statsTable.get(Stats.QUEUE_PUT_SIZE).addValue(buildStatsKey(topic, 
queueId), size, 1);
         }
     }
 
-    public void incQueueGetNums(final String group, final String topic, final 
Integer queueId, final int incValue) {
+    public void incQueueGetNums(final String group, final String topic, final 
int queueId, final int incValue) {

Review Comment:
   Changing public method parameters from `Integer` to primitive `int` is a 
source/binary compatibility change for callers compiled against the old 
signature, and also removes the ability to pass `null` (previously would 
produce a string key like `topic@null`). If this is a public API, consider 
keeping overloads with `Integer` (possibly deprecated) or performing the 
migration in a compatibility-friendly way.



##########
store/src/main/java/org/apache/rocketmq/store/timer/TimerWheel.java:
##########
@@ -313,11 +323,13 @@ public void reviseSlot(long timeMs, long firstPos, long 
lastPos, boolean force)
         } else {
             if (IGNORE != firstPos) {
                 localBuffer.get().putLong(firstPos);
+                dirty = true;
             } else {
                 localBuffer.get().getLong();
             }
             if (IGNORE != lastPos) {
                 localBuffer.get().putLong(lastPos);
+                dirty = true;

Review Comment:
   The `dirty` flag can lose updates under concurrency: during `flush()`, 
another thread can set `dirty = true` after the `if (!dirty)` check but before 
`dirty = false` at the end, causing the write to be missed and the wheel state 
not persisted. Use an atomic state transition (e.g., 
`AtomicBoolean.compareAndSet(true, false)` at flush start) or 
synchronize/serialize modifications and flush so `dirty` cannot be cleared 
while writes are in-flight.



##########
store/src/main/java/org/apache/rocketmq/store/timer/TimerWheel.java:
##########
@@ -55,6 +55,8 @@ protected ByteBuffer initialValue() {
     };
     private final int wheelLength;
 
+    private volatile boolean dirty;

Review Comment:
   The `dirty` flag can lose updates under concurrency: during `flush()`, 
another thread can set `dirty = true` after the `if (!dirty)` check but before 
`dirty = false` at the end, causing the write to be missed and the wheel state 
not persisted. Use an atomic state transition (e.g., 
`AtomicBoolean.compareAndSet(true, false)` at flush start) or 
synchronize/serialize modifications and flush so `dirty` cannot be cleared 
while writes are in-flight.



##########
store/src/main/java/org/apache/rocketmq/store/timer/TimerWheel.java:
##########
@@ -128,17 +130,25 @@ public void flush() {
         if (mappedByteBuffer == null) {
             return;
         }
+        if (!dirty) {
+            return;
+        }
         ByteBuffer bf = localBuffer.get();
-        bf.position(0);
-        bf.limit(wheelLength);
-        mappedByteBuffer.position(0);
-        mappedByteBuffer.limit(wheelLength);
-        for (int i = 0; i < wheelLength; i++) {
-            if (bf.get(i) != mappedByteBuffer.get(i)) {
-                mappedByteBuffer.put(i, bf.get(i));
+        int longAligned = wheelLength & ~7;

Review Comment:
   The `dirty` flag can lose updates under concurrency: during `flush()`, 
another thread can set `dirty = true` after the `if (!dirty)` check but before 
`dirty = false` at the end, causing the write to be missed and the wheel state 
not persisted. Use an atomic state transition (e.g., 
`AtomicBoolean.compareAndSet(true, false)` at flush start) or 
synchronize/serialize modifications and flush so `dirty` cannot be cleared 
while writes are in-flight.



##########
store/src/main/java/org/apache/rocketmq/store/timer/TimerWheel.java:
##########
@@ -287,11 +297,10 @@ public int getSlotIndex(long timeMs) {
 
     public void putSlot(long timeMs, long firstPos, long lastPos) {
         localBuffer.get().position(getSlotIndex(timeMs) * Slot.SIZE);
-        // To be compatible with previous version.
-        // The previous version's precision is fixed at 1000ms and it store 
timeMs / 1000 in slot.
         localBuffer.get().putLong(timeMs / precisionMs);
         localBuffer.get().putLong(firstPos);
         localBuffer.get().putLong(lastPos);
+        dirty = true;

Review Comment:
   The `dirty` flag can lose updates under concurrency: during `flush()`, 
another thread can set `dirty = true` after the `if (!dirty)` check but before 
`dirty = false` at the end, causing the write to be missed and the wheel state 
not persisted. Use an atomic state transition (e.g., 
`AtomicBoolean.compareAndSet(true, false)` at flush start) or 
synchronize/serialize modifications and flush so `dirty` cannot be cleared 
while writes are in-flight.



##########
store/src/main/java/org/apache/rocketmq/store/timer/TimerWheel.java:
##########
@@ -128,17 +130,25 @@ public void flush() {
         if (mappedByteBuffer == null) {
             return;
         }
+        if (!dirty) {
+            return;
+        }
         ByteBuffer bf = localBuffer.get();
-        bf.position(0);
-        bf.limit(wheelLength);
-        mappedByteBuffer.position(0);
-        mappedByteBuffer.limit(wheelLength);
-        for (int i = 0; i < wheelLength; i++) {
-            if (bf.get(i) != mappedByteBuffer.get(i)) {
-                mappedByteBuffer.put(i, bf.get(i));
+        int longAligned = wheelLength & ~7;
+        for (int i = 0; i < longAligned; i += 8) {
+            long local = bf.getLong(i);
+            if (local != mappedByteBuffer.getLong(i)) {
+                mappedByteBuffer.putLong(i, local);
+            }
+        }
+        for (int i = longAligned; i < wheelLength; i++) {
+            byte b = bf.get(i);
+            if (b != mappedByteBuffer.get(i)) {
+                mappedByteBuffer.put(i, b);
             }
         }
         this.mappedByteBuffer.force();
+        dirty = false;

Review Comment:
   The `dirty` flag can lose updates under concurrency: during `flush()`, 
another thread can set `dirty = true` after the `if (!dirty)` check but before 
`dirty = false` at the end, causing the write to be missed and the wheel state 
not persisted. Use an atomic state transition (e.g., 
`AtomicBoolean.compareAndSet(true, false)` at flush start) or 
synchronize/serialize modifications and flush so `dirty` cannot be cleared 
while writes are in-flight.



##########
store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java:
##########
@@ -380,32 +386,32 @@ public void onGroupDeleted(final String group) {
         this.momentStatsItemSetFallTime.delValueBySuffixKey(group, "@");
     }
 
-    public void incQueuePutNums(final String topic, final Integer queueId) {
+    public void incQueuePutNums(final String topic, final int queueId) {
         if (enableQueueStat) {
             
this.statsTable.get(Stats.QUEUE_PUT_NUMS).addValue(buildStatsKey(topic, 
queueId), 1, 1);
         }
     }
 
-    public void incQueuePutNums(final String topic, final Integer queueId, int 
num, int times) {
+    public void incQueuePutNums(final String topic, final int queueId, int 
num, int times) {
         if (enableQueueStat) {
             
this.statsTable.get(Stats.QUEUE_PUT_NUMS).addValue(buildStatsKey(topic, 
queueId), num, times);
         }
     }
 
-    public void incQueuePutSize(final String topic, final Integer queueId, 
final int size) {
+    public void incQueuePutSize(final String topic, final int queueId, final 
int size) {
         if (enableQueueStat) {
             
this.statsTable.get(Stats.QUEUE_PUT_SIZE).addValue(buildStatsKey(topic, 
queueId), size, 1);
         }
     }
 
-    public void incQueueGetNums(final String group, final String topic, final 
Integer queueId, final int incValue) {
+    public void incQueueGetNums(final String group, final String topic, final 
int queueId, final int incValue) {
         if (enableQueueStat) {
             final String statsKey = buildStatsKey(topic, queueId, group);
             this.statsTable.get(Stats.QUEUE_GET_NUMS).addValue(statsKey, 
incValue, 1);
         }
     }
 
-    public void incQueueGetSize(final String group, final String topic, final 
Integer queueId, final int incValue) {
+    public void incQueueGetSize(final String group, final String topic, final 
int queueId, final int incValue) {

Review Comment:
   Changing public method parameters from `Integer` to primitive `int` is a 
source/binary compatibility change for callers compiled against the old 
signature, and also removes the ability to pass `null` (previously would 
produce a string key like `topic@null`). If this is a public API, consider 
keeping overloads with `Integer` (possibly deprecated) or performing the 
migration in a compatibility-friendly way.



##########
store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java:
##########
@@ -523,14 +574,30 @@ public void incGroupGetLatency(final String group, final 
String topic, final int
     }
 
     public void incTopicPutLatency(final String topic, final int queueId, 
final int incValue) {
-        StringBuilder statsKey;
+        String statsKey = getLatencyKey(topic, queueId);
+        this.statsTable.get(Stats.TOPIC_PUT_LATENCY).addValue(statsKey, 
incValue, 1);
+    }
+
+    private String getLatencyKey(final String topic, final int queueId) {
+        String[] keys = latencyKeyCache.get(topic);
+        if (keys != null && queueId < keys.length) {
+            String k = keys[queueId];
+            if (k != null) {
+                return k;
+            }
+        }

Review Comment:
   `ConcurrentHashMap` does not permit `null` keys. `buildStatsKey(String, 
int)` and `getLatencyKey(...)` now call `...Cache.get(topic)` without guarding 
`topic != null`, which can throw `NullPointerException` where the previous 
implementation tolerated `topic == null`. Add an early return for `topic == 
null` before accessing the cache (or skip caching in that case).



##########
store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java:
##########
@@ -380,32 +386,32 @@ public void onGroupDeleted(final String group) {
         this.momentStatsItemSetFallTime.delValueBySuffixKey(group, "@");
     }
 
-    public void incQueuePutNums(final String topic, final Integer queueId) {
+    public void incQueuePutNums(final String topic, final int queueId) {

Review Comment:
   Changing public method parameters from `Integer` to primitive `int` is a 
source/binary compatibility change for callers compiled against the old 
signature, and also removes the ability to pass `null` (previously would 
produce a string key like `topic@null`). If this is a public API, consider 
keeping overloads with `Integer` (possibly deprecated) or performing the 
migration in a compatibility-friendly way.



##########
store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java:
##########
@@ -380,32 +386,32 @@ public void onGroupDeleted(final String group) {
         this.momentStatsItemSetFallTime.delValueBySuffixKey(group, "@");
     }
 
-    public void incQueuePutNums(final String topic, final Integer queueId) {
+    public void incQueuePutNums(final String topic, final int queueId) {
         if (enableQueueStat) {
             
this.statsTable.get(Stats.QUEUE_PUT_NUMS).addValue(buildStatsKey(topic, 
queueId), 1, 1);
         }
     }
 
-    public void incQueuePutNums(final String topic, final Integer queueId, int 
num, int times) {
+    public void incQueuePutNums(final String topic, final int queueId, int 
num, int times) {
         if (enableQueueStat) {
             
this.statsTable.get(Stats.QUEUE_PUT_NUMS).addValue(buildStatsKey(topic, 
queueId), num, times);
         }
     }
 
-    public void incQueuePutSize(final String topic, final Integer queueId, 
final int size) {
+    public void incQueuePutSize(final String topic, final int queueId, final 
int size) {

Review Comment:
   Changing public method parameters from `Integer` to primitive `int` is a 
source/binary compatibility change for callers compiled against the old 
signature, and also removes the ability to pass `null` (previously would 
produce a string key like `topic@null`). If this is a public API, consider 
keeping overloads with `Integer` (possibly deprecated) or performing the 
migration in a compatibility-friendly way.



##########
store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java:
##########
@@ -141,6 +142,11 @@ public class BrokerStatsManager {
     private ScheduledExecutorService cleanResourceExecutor;
 
     private final HashMap<String, StatsItemSet> statsTable = new HashMap<>();
+    private final ConcurrentHashMap<String, String[]> latencyKeyCache = new 
ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, String[]> statsKeyByQueueCache = 
new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, ConcurrentHashMap<String, String>> 
statsKeyByGroupCache = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, ConcurrentHashMap<String, 
String[]>> statsKeyByTopicQueueGroupCache = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, ConcurrentHashMap<String, 
String[]>> statsKeyByQueueTopicGroupCache = new ConcurrentHashMap<>();

Review Comment:
   These caches are unbounded and appear to retain entries indefinitely per 
topic/group/queue, which can lead to memory growth in long-running brokers with 
high churn or many unique groups/topics. Consider clearing relevant cache 
entries in existing lifecycle hooks (e.g., on topic/group deletion) and/or 
introducing size bounds/eviction to prevent unbounded retention.



##########
store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java:
##########
@@ -380,32 +386,32 @@ public void onGroupDeleted(final String group) {
         this.momentStatsItemSetFallTime.delValueBySuffixKey(group, "@");
     }
 
-    public void incQueuePutNums(final String topic, final Integer queueId) {
+    public void incQueuePutNums(final String topic, final int queueId) {
         if (enableQueueStat) {
             
this.statsTable.get(Stats.QUEUE_PUT_NUMS).addValue(buildStatsKey(topic, 
queueId), 1, 1);
         }
     }
 
-    public void incQueuePutNums(final String topic, final Integer queueId, int 
num, int times) {
+    public void incQueuePutNums(final String topic, final int queueId, int 
num, int times) {

Review Comment:
   Changing public method parameters from `Integer` to primitive `int` is a 
source/binary compatibility change for callers compiled against the old 
signature, and also removes the ability to pass `null` (previously would 
produce a string key like `topic@null`). If this is a public API, consider 
keeping overloads with `Integer` (possibly deprecated) or performing the 
migration in a compatibility-friendly way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to