This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new ebf1595418 [ISSUE #10103] Improve batch polling efficiency in 
pollIndexRecord method (#10104)
ebf1595418 is described below

commit ebf1595418178d5c73ba80ab08fb3a7312efc25e
Author: yx9o <[email protected]>
AuthorDate: Tue Mar 17 16:22:28 2026 +0800

    [ISSUE #10103] Improve batch polling efficiency in pollIndexRecord method 
(#10104)
---
 .../store/index/rocksdb/IndexRocksDBStore.java     | 22 +++++++++++-----------
 1 file changed, 11 insertions(+), 11 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/index/rocksdb/IndexRocksDBStore.java
 
b/store/src/main/java/org/apache/rocketmq/store/index/rocksdb/IndexRocksDBStore.java
index 202cf542b0..38303bf504 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/index/rocksdb/IndexRocksDBStore.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/index/rocksdb/IndexRocksDBStore.java
@@ -321,18 +321,18 @@ public class IndexRocksDBStore implements 
CommitLogDispatchStore {
         private void pollIndexRecord() {
             try {
                 IndexRocksDBRecord firstReq = originIndexMsgQueue.poll(100, 
TimeUnit.MILLISECONDS);
-                if (null != firstReq) {
-                    irs.add(firstReq);
-                    while (true) {
-                        IndexRocksDBRecord tmpReq = 
originIndexMsgQueue.poll(100, TimeUnit.MILLISECONDS);
-                        if (null == tmpReq) {
-                            break;
-                        }
-                        irs.add(tmpReq);
-                        if (irs.size() >= BATCH_SIZE) {
-                            break;
-                        }
+                if (firstReq == null) {
+                    return;
+                }
+                irs.add(firstReq);
+                originIndexMsgQueue.drainTo(irs, BATCH_SIZE - irs.size());
+                while (irs.size() < BATCH_SIZE) {
+                    IndexRocksDBRecord tmpReq = originIndexMsgQueue.poll(100, 
TimeUnit.MILLISECONDS);
+                    if (tmpReq == null) {
+                        break;
                     }
+                    irs.add(tmpReq);
+                    originIndexMsgQueue.drainTo(irs, BATCH_SIZE - irs.size());
                 }
             } catch (Exception e) {
                 logError.error("IndexRocksDBStore IndexBuildService error: 
{}", e.getMessage());

Reply via email to