This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 541fdad0ff [ISSUE #9947] Fix TimerMessageStore.checkAndReviseMetrics
throws BufferUnderflowException (#9948)
541fdad0ff is described below
commit 541fdad0ff91990530da0a09e834edffb73ab23c
Author: gaoyf <[email protected]>
AuthorDate: Wed Dec 24 11:27:59 2025 +0800
[ISSUE #9947] Fix TimerMessageStore.checkAndReviseMetrics throws
BufferUnderflowException (#9948)
---
.../rocketmq/store/timer/TimerMessageStore.java | 3 ++
.../apache/rocketmq/store/timer/TimerLogTest.java | 36 ++++++++++++++++++++++
2 files changed, 39 insertions(+)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index 5fee8da6d0..cd5e9f4480 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -1344,6 +1344,9 @@ public class TimerMessageStore {
bf.getInt();//size
bf.getLong();//prev pos
int magic = bf.getInt(); //magic
+ if (magic == TimerLog.BLANK_MAGIC_CODE) {
+ break;
+ }
long enqueueTime = bf.getLong();
long delayedTime = bf.getInt() + enqueueTime;
long offsetPy = bf.getLong();
diff --git
a/store/src/test/java/org/apache/rocketmq/store/timer/TimerLogTest.java
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerLogTest.java
index 112c3ad46b..ed7ff63427 100644
--- a/store/src/test/java/org/apache/rocketmq/store/timer/TimerLogTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/timer/TimerLogTest.java
@@ -97,6 +97,42 @@ public class TimerLogTest {
assertArrayEquals(expect, data);
}
+ @Test
+ public void testAppendBlankByteBuffer() throws Exception {
+ TimerLog timerLog = createTimerLog(null);
+ ByteBuffer byteBuffer = ByteBuffer.allocate(TimerLog.UNIT_SIZE);
+ byteBuffer.putInt(TimerLog.UNIT_SIZE);
+ byteBuffer.putLong(Long.MAX_VALUE);
+ byteBuffer.putInt(0);
+ byteBuffer.putLong(Long.MAX_VALUE);
+ byteBuffer.putInt(0);
+ byteBuffer.putLong(1000);
+ byteBuffer.putInt(10);
+ byteBuffer.putInt(123);
+ byteBuffer.putInt(0);
+ int maxAppend = 1024 / TimerLog.UNIT_SIZE + 1;
+ for (int i = 0; i < maxAppend; i++) {
+ timerLog.append(byteBuffer.array(), 0, TimerLog.UNIT_SIZE);
+ }
+ SelectMappedBufferResult sbr = timerLog.getWholeBuffer(0);
+ ByteBuffer bf = sbr.getByteBuffer();
+ for (int position = 0; position < sbr.getSize(); position +=
TimerLog.UNIT_SIZE) {
+ bf.position(position);
+ bf.getInt();
+ bf.getLong();
+ int magic = bf.getInt();
+ if (position / TimerLog.UNIT_SIZE == maxAppend - 1) {
+ assertEquals(TimerLog.BLANK_MAGIC_CODE, magic);
+ continue;
+ }
+ bf.getLong();
+ bf.getInt();
+ bf.getLong();
+ bf.getInt();
+ bf.getInt();
+ }
+ }
+
@After
public void shutdown() {
for (TimerLog timerLog : timerLogs) {