This is an automated email from the ASF dual-hosted git repository.

scarb pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 74ab3ae112 [ISSUE #9648] Fix getOffsetInQueueByTime missing 
boundaryType in tieredMessageStore (#9649)
74ab3ae112 is described below

commit 74ab3ae112c8ded6493677a29ad5da6212596be9
Author: Duxuwei <[email protected]>
AuthorDate: Tue Sep 2 19:49:40 2025 +0800

    [ISSUE #9648] Fix getOffsetInQueueByTime missing boundaryType in 
tieredMessageStore (#9649)
    
    * [ISSUE #9648] Fix getOffsetInQueueByTime missing in tieredMessageStore
    
    * Update TieredMessageStoreTest.java
    
    * Delete inappropriate UT
    
    * Remove unused import
---
 .../store/MessageStoreStateMachineTest.java        | 38 ----------------------
 .../rocketmq/tieredstore/TieredMessageStore.java   |  4 +--
 .../tieredstore/TieredMessageStoreTest.java        | 33 ++++++++++++++++---
 3 files changed, 30 insertions(+), 45 deletions(-)

diff --git 
a/store/src/test/java/org/apache/rocketmq/store/MessageStoreStateMachineTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/MessageStoreStateMachineTest.java
index d00074dbcb..333e419681 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/MessageStoreStateMachineTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/MessageStoreStateMachineTest.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.store;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.anyLong;
@@ -107,41 +106,4 @@ public class MessageStoreStateMachineTest {
         // Verify the current state
         assertEquals(MessageStoreState.INIT, stateMachine.getCurrentState());
     }
-
-    /**
-     * Test getTotalRunningTimeMs method.
-     */
-    @Test
-    public void testGetTotalRunningTimeMs() {
-        // Sleep for a short duration to simulate elapsed time
-        try {
-            Thread.sleep(100);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
-
-        // Verify the total running time is approximately correct
-        long totalTime = stateMachine.getTotalRunningTimeMs();
-        assertTrue(totalTime >= 100 && totalTime < 200);
-    }
-
-    /**
-     * Test getCurrentStateRunningTimeMs method.
-     */
-    @Test
-    public void testGetCurrentStateRunningTimeMs() {
-        // Perform a state transition
-        stateMachine.transitTo(MessageStoreState.LOAD_COMMITLOG_OK);
-
-        // Sleep for a short duration to simulate elapsed time
-        try {
-            Thread.sleep(100);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
-
-        // Verify the current state running time is approximately correct
-        long currentStateTime = stateMachine.getCurrentStateRunningTimeMs();
-        assertTrue(currentStateTime >= 100 && currentStateTime < 200);
-    }
 }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index f1c935d00b..19b587fa32 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -370,11 +370,11 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
                 .build();
             
TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS),
 latencyAttributes);
             if (offsetInTieredStore == -1L && !isForce) {
-                return next.getOffsetInQueueByTime(topic, queueId, timestamp);
+                return next.getOffsetInQueueByTime(topic, queueId, timestamp, 
boundaryType);
             }
             return offsetInTieredStore;
         }
-        return next.getOffsetInQueueByTime(topic, queueId, timestamp);
+        return next.getOffsetInQueueByTime(topic, queueId, timestamp, 
boundaryType);
     }
 
     @Override
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
index bb259ae811..1a0240681c 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
@@ -62,7 +62,6 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.when;
 
 public class TieredMessageStoreTest {
@@ -275,19 +274,43 @@ public class TieredMessageStoreTest {
 
     @Test
     public void testGetOffsetInQueueByTime() {
+        final long earliestMsgTime = 100L;
         Properties properties = new Properties();
         properties.setProperty("tieredStorageLevel", "FORCE");
         configuration.update(properties);
 
-        Mockito.when(fetcher.getOffsetInQueueByTime(anyString(), anyInt(), 
anyLong(), eq(BoundaryType.LOWER))).thenReturn(1L);
-        Mockito.when(defaultStore.getOffsetInQueueByTime(anyString(), 
anyInt(), anyLong())).thenReturn(2L);
-        Mockito.when(defaultStore.getEarliestMessageTime()).thenReturn(100L);
+        Mockito.when(fetcher.getOffsetInQueueByTime(anyString(), anyInt(), 
anyLong(), any(BoundaryType.class)))
+            .thenAnswer(ivk -> ivk.getArgument(3, BoundaryType.class) == 
BoundaryType.LOWER ? 1L : 2L);
+        Mockito.when(defaultStore.getOffsetInQueueByTime(anyString(), 
anyInt(), anyLong(), any(BoundaryType.class)))
+            .thenAnswer(ivk -> {
+                long time = ivk.getArgument(2, Long.class);
+                if (time < earliestMsgTime) {
+                    return -1L;
+                }
+                return ivk.getArgument(3, BoundaryType.class) == 
BoundaryType.LOWER ? 3L : 4L;
+            });
+        
Mockito.when(defaultStore.getEarliestMessageTime()).thenReturn(earliestMsgTime);
+
+        // Message not in disk, but force, found in tired storage.
         Assert.assertEquals(1L, 
currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 1000, 
BoundaryType.LOWER));
+        Assert.assertEquals(2L, 
currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 1000, 
BoundaryType.UPPER));
+        // Message in disk, and force, found in tired storage.
         Assert.assertEquals(1L, 
currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 0, 
BoundaryType.LOWER));
+        Assert.assertEquals(2L, 
currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 0, 
BoundaryType.UPPER));
 
-        Mockito.when(fetcher.getOffsetInQueueByTime(anyString(), anyInt(), 
anyLong(), eq(BoundaryType.LOWER))).thenReturn(-1L);
+        // Message in disk, but force, and not found in tired storage.
+        Mockito.when(fetcher.getOffsetInQueueByTime(anyString(), anyInt(), 
anyLong(), any(BoundaryType.class))).thenReturn(-1L);
         Assert.assertEquals(-1L, 
currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 0));
         Assert.assertEquals(-1L, 
currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 0, 
BoundaryType.LOWER));
+
+        properties.setProperty("tieredStorageLevel", "NOT_IN_DISK");
+        configuration.update(properties);
+        // Message not in disk, and not found in tired storage.
+        Assert.assertEquals(-1L, 
currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 0, 
BoundaryType.LOWER));
+        Assert.assertEquals(-1L, 
currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 0, 
BoundaryType.UPPER));
+        // Message in disk, and found in disk.
+        Assert.assertEquals(3L, 
currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 1000, 
BoundaryType.LOWER));
+        Assert.assertEquals(4L, 
currentStore.getOffsetInQueueByTime(mq.getTopic(), mq.getQueueId(), 1000, 
BoundaryType.UPPER));
     }
 
     @Test

Reply via email to