This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 932588d3e1 [ISSUE #10201] Optimize queryOffset method overloads in
IndexService (#10202)
932588d3e1 is described below
commit 932588d3e17401e130c339bb1b2369ec2c4d8b8d
Author: yx9o <[email protected]>
AuthorDate: Wed Mar 25 15:24:21 2026 +0800
[ISSUE #10201] Optimize queryOffset method overloads in IndexService
(#10202)
---
.../apache/rocketmq/store/index/IndexService.java | 41 ++--------------
.../rocketmq/store/index/IndexServiceTest.java | 57 +++++++++++++++++-----
2 files changed, 49 insertions(+), 49 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java
b/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java
index 4c28d2a355..1a180e4442 100644
--- a/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java
@@ -167,49 +167,14 @@ public class IndexService implements
CommitLogDispatchStore {
}
public QueryOffsetResult queryOffset(String topic, String key, int maxNum,
long begin, long end) {
- long indexLastUpdateTimestamp = 0;
- long indexLastUpdatePhyoffset = 0;
- maxNum = Math.min(maxNum,
this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());
- List<Long> phyOffsets = new ArrayList<>(maxNum);
- try {
- this.readWriteLock.readLock().lock();
- if (!this.indexFileList.isEmpty()) {
- for (int i = this.indexFileList.size(); i > 0; i--) {
- IndexFile f = this.indexFileList.get(i - 1);
- boolean lastFile = i == this.indexFileList.size();
- if (lastFile) {
- indexLastUpdateTimestamp = f.getEndTimestamp();
- indexLastUpdatePhyoffset = f.getEndPhyOffset();
- }
-
- if (f.isTimeMatched(begin, end)) {
-
- f.selectPhyOffset(phyOffsets, buildKey(topic, key),
maxNum, begin, end);
- }
-
- if (f.getBeginTimestamp() < begin) {
- break;
- }
-
- if (phyOffsets.size() >= maxNum) {
- break;
- }
- }
- }
- } catch (Exception e) {
- LOGGER.error("queryMsg exception", e);
- } finally {
- this.readWriteLock.readLock().unlock();
- }
-
- return new QueryOffsetResult(phyOffsets, indexLastUpdateTimestamp,
indexLastUpdatePhyoffset);
+ return queryOffset(topic, key, maxNum, begin, end, null);
}
public QueryOffsetResult queryOffset(String topic, String key, int maxNum,
long begin, long end, String indexType) {
- List<Long> phyOffsets = new ArrayList<>(maxNum);
long indexLastUpdateTimestamp = 0;
long indexLastUpdatePhyoffset = 0;
maxNum = Math.min(maxNum,
this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());
+ List<Long> phyOffsets = new ArrayList<>(maxNum);
try {
this.readWriteLock.readLock().lock();
if (!this.indexFileList.isEmpty()) {
@@ -241,7 +206,7 @@ public class IndexService implements CommitLogDispatchStore
{
}
}
} catch (Exception e) {
- LOGGER.error("queryMsg queryOffset exception", e);
+ LOGGER.error("queryOffset exception", e);
} finally {
this.readWriteLock.readLock().unlock();
}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/index/IndexServiceTest.java
b/store/src/test/java/org/apache/rocketmq/store/index/IndexServiceTest.java
index 057bbfd0e1..bd520e6a48 100644
--- a/store/src/test/java/org/apache/rocketmq/store/index/IndexServiceTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/index/IndexServiceTest.java
@@ -21,29 +21,64 @@ import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.junit.Before;
import org.junit.Test;
+import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
-
public class IndexServiceTest {
+ private IndexService indexService;
+
+ @Before
+ public void setUp() throws Exception {
+ DefaultMessageStore store = new DefaultMessageStore(
+ new MessageStoreConfig(),
+ new BrokerStatsManager(new BrokerConfig()),
+ null,
+ new BrokerConfig(),
+ new ConcurrentHashMap<>()
+ );
+ indexService = new IndexService(store);
+ }
+
@Test
- public void testQueryOffsetThrow() throws Exception {
+ public void testQueryOffsetThrow() {
assertDoesNotThrow(() -> {
- DefaultMessageStore store = new DefaultMessageStore(
- new MessageStoreConfig(),
- new BrokerStatsManager(new BrokerConfig()),
- null,
- new BrokerConfig(),
- new ConcurrentHashMap<>()
- );
-
- IndexService indexService = new IndexService(store);
indexService.queryOffset("test", "", Integer.MAX_VALUE, 10, 100);
});
}
+ @Test
+ public void testQueryOffsetWithoutIndexType() {
+ QueryOffsetResult result = indexService.queryOffset("test", "testKey",
10, 0, 100);
+ assertNotNull(result);
+ assertEquals(Collections.emptyList(), result.getPhyOffsets());
+ }
+
+ @Test
+ public void testQueryOffsetWithIndexType() {
+ QueryOffsetResult result = indexService.queryOffset("test", "testKey",
10, 0, 100, "TAG");
+ assertNotNull(result);
+ assertEquals(Collections.emptyList(), result.getPhyOffsets());
+ }
+
+ @Test
+ public void testQueryOffsetWithNullKey() {
+ QueryOffsetResult result = indexService.queryOffset("test", null, 10,
0, 100);
+ assertNotNull(result);
+ assertEquals(Collections.emptyList(), result.getPhyOffsets());
+ }
+
+ @Test
+ public void testQueryOffsetWithZeroMaxNum() {
+ QueryOffsetResult result = indexService.queryOffset("test", "testKey",
0, 0, 100);
+ assertNotNull(result);
+ assertEquals(Collections.emptyList(), result.getPhyOffsets());
+ }
}