This is an automated email from the ASF dual-hosted git repository.
scarb pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 74ab3ae112 [ISSUE #9648] Fix getOffsetInQueueByTime missing
boundaryType in tieredMessageStore (#9649)
74ab3ae112 is described below
commit 74ab3ae112c8ded6493677a29ad5da6212596be9
Author: Duxuwei <[email protected]>
AuthorDate: Tue Sep 2 19:49:40 2025 +0800
[ISSUE #9648] Fix getOffsetInQueueByTime missing boundaryType in
tieredMessageStore (#9649)
* [ISSUE #9648] Fix getOffsetInQueueByTime missing in tieredMessageStore
* Update TieredMessageStoreTest.java
* Delete inappropriate UT
* Remove unused import
---
.../store/MessageStoreStateMachineTest.java | 38 ----------------------
.../rocketmq/tieredstore/TieredMessageStore.java | 4 +--
.../tieredstore/TieredMessageStoreTest.java | 33 ++++++++++++++++---
3 files changed, 30 insertions(+), 45 deletions(-)
diff --git
a/store/src/test/java/org/apache/rocketmq/store/MessageStoreStateMachineTest.java
b/store/src/test/java/org/apache/rocketmq/store/MessageStoreStateMachineTest.java
index d00074dbcb..333e419681 100644
---
a/store/src/test/java/org/apache/rocketmq/store/MessageStoreStateMachineTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/MessageStoreStateMachineTest.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.store;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.anyLong;
@@ -107,41 +106,4 @@ public class MessageStoreStateMachineTest {
// Verify the current state
assertEquals(MessageStoreState.INIT, stateMachine.getCurrentState());
}
-
- /**
- * Test getTotalRunningTimeMs method.
- */
- @Test
- public void testGetTotalRunningTimeMs() {
- // Sleep for a short duration to simulate elapsed time
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
-
- // Verify the total running time is approximately correct
- long totalTime = stateMachine.getTotalRunningTimeMs();
- assertTrue(totalTime >= 100 && totalTime < 200);
- }
-
- /**
- * Test getCurrentStateRunningTimeMs method.
- */
- @Test
- public void testGetCurrentStateRunningTimeMs() {
- // Perform a state transition
- stateMachine.transitTo(MessageStoreState.LOAD_COMMITLOG_OK);
-
- // Sleep for a short duration to simulate elapsed time
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
-
- // Verify the current state running time is approximately correct
- long currentStateTime = stateMachine.getCurrentStateRunningTimeMs();
- assertTrue(currentStateTime >= 100 && currentStateTime < 200);
- }
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index f1c935d00b..19b587fa32 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -370,11 +370,11 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
.build();
TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS),
latencyAttributes);
if (offsetInTieredStore == -1L && !isForce) {
- return next.getOffsetInQueueByTime(topic, queueId, timestamp);
+ return next.getOffsetInQueueByTime(topic, queueId, timestamp,
boundaryType);
}
return offsetInTieredStore;
}
- return next.getOffsetInQueueByTime(topic, queueId, timestamp);
+ return next.getOffsetInQueueByTime(topic, queueId, timestamp,
boundaryType);
}
@Override
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
index bb259ae811..1a0240681c 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
@@ -62,7 +62,6 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
public class TieredMessageStoreTest {
@@ -275,19 +274,43 @@ public class TieredMessageStoreTest {
@Test
public void testGetOffsetInQueueByTime() {
+ final long earliestMsgTime = 100L;
Properties properties = new Properties();
properties.setProperty("tieredStorageLevel", "FORCE");
configuration.update(properties);
- Mockito.when(fetcher.getOffsetInQueueByTime(anyString(), anyInt(),
anyLong(), eq(BoundaryType.LOWER))).thenReturn(1L);
- Mockito.when(defaultStore.getOffsetInQueueByTime(anyString(),
anyInt(), anyLong())).thenReturn(2L);
- Mockito.when(defaultStore.getEarliestMessageTime()).thenReturn(100L);
+ Mockito.when(fetcher.getOffsetInQueueByTime(anyString(), anyInt(),
anyLong(), any(BoundaryType.class)))
+ .thenAnswer(ivk -> ivk.getArgument(3, BoundaryType.class) ==
BoundaryType.LOWER ? 1L : 2L);
+ Mockito.when(defaultStore.getOffsetInQueueByTime(anyString(),
anyInt(), anyLong(), any(BoundaryType.class)))
+ .thenAnswer(ivk -> {
+ long time = ivk.getArgument(2, Long.class);
+ if (time < earliestMsgTime) {
+ return -1L;
+ }
+ return ivk.getArgument(3, BoundaryType.class) ==
BoundaryType.LOWER ? 3L : 4L;
+ });
+
Mockito.when(defaultStore.getEarliestMessageTime()).thenReturn(earliestMsgTime);
+
+ // Message not in disk, but force, found in tired storage.
Assert.assertEquals(1L,
currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 1000,
BoundaryType.LOWER));
+ Assert.assertEquals(2L,
currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 1000,
BoundaryType.UPPER));
+ // Message in disk, and force, found in tired storage.
Assert.assertEquals(1L,
currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 0,
BoundaryType.LOWER));
+ Assert.assertEquals(2L,
currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 0,
BoundaryType.UPPER));
- Mockito.when(fetcher.getOffsetInQueueByTime(anyString(), anyInt(),
anyLong(), eq(BoundaryType.LOWER))).thenReturn(-1L);
+ // Message in disk, but force, and not found in tired storage.
+ Mockito.when(fetcher.getOffsetInQueueByTime(anyString(), anyInt(),
anyLong(), any(BoundaryType.class))).thenReturn(-1L);
Assert.assertEquals(-1L,
currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 0));
Assert.assertEquals(-1L,
currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 0,
BoundaryType.LOWER));
+
+ properties.setProperty("tieredStorageLevel", "NOT_IN_DISK");
+ configuration.update(properties);
+ // Message not in disk, and not found in tired storage.
+ Assert.assertEquals(-1L,
currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 0,
BoundaryType.LOWER));
+ Assert.assertEquals(-1L,
currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 0,
BoundaryType.UPPER));
+ // Message in disk, and found in disk.
+ Assert.assertEquals(3L,
currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 1000,
BoundaryType.LOWER));
+ Assert.assertEquals(4L,
currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 1000,
BoundaryType.UPPER));
}
@Test