lizhimins commented on code in PR #7594:
URL: https://github.com/apache/rocketmq/pull/7594#discussion_r1413454902


##########
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java:
##########
@@ -303,74 +270,94 @@ public CompletableFuture<GetMessageResult> 
getMessageFromCacheAsync(CompositeQue
 
         recordCacheAccess(flatFile, group, queueOffset, resultWrapperList);
 
-        // if cache hit, result will be returned immediately and 
asynchronously prefetch messages for later requests
-        if (!resultWrapperList.isEmpty()) {
-            LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: cache hit: 
" +
-                    "topic: {}, queue: {}, queue offset: {}, max message num: 
{}, cache hit num: {}",
-                mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, 
resultWrapperList.size());
-            prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1);
+        if (resultWrapperList.isEmpty()) {
+            // If cache miss, pull messages immediately
+            LOGGER.info("MessageFetcher cache miss, group: {}, topic: {}, 
queueId: {}, offset: {}, maxCount: {}",
+                group, mq.getTopic(), mq.getQueueId(), queueOffset, maxCount);
+        } else {
+            // If cache hit, return buffer result immediately and 
asynchronously prefetch messages
+            LOGGER.debug("MessageFetcher cache hit, group: {}, topic: {}, 
queueId: {}, offset: {}, maxCount: {}, resultSize: {}",
+                group, mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, 
resultWrapperList.size());
 
-            GetMessageResult result = new GetMessageResult();
+            GetMessageResultExt result = new GetMessageResultExt();
             result.setStatus(GetMessageStatus.FOUND);
             result.setMinOffset(flatFile.getConsumeQueueMinOffset());
             result.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
             result.setNextBeginOffset(queueOffset + resultWrapperList.size());
-            resultWrapperList.forEach(wrapper -> 
result.addMessage(wrapper.getDuplicateResult(), wrapper.getCurOffset()));
+            resultWrapperList.forEach(wrapper -> result.addMessageExt(
+                wrapper.getDuplicateResult(), wrapper.getOffset(), 
wrapper.getTagCode()));
+
+            if (lastGetOffset < result.getMaxOffset()) {
+                this.prefetchMessage(flatFile, group, maxCount, lastGetOffset 
+ 1);
+            }
             return CompletableFuture.completedFuture(result);
         }
 
-        // if cache is miss, immediately pull messages
-        LOGGER.info("TieredMessageFetcher#getMessageFromCacheAsync: cache 
miss: " +
-                "topic: {}, queue: {}, queue offset: {}, max message num: {}",
-            mq.getTopic(), mq.getQueueId(), queueOffset, maxCount);
-
-        CompletableFuture<GetMessageResult> resultFuture;
+        CompletableFuture<GetMessageResultExt> resultFuture;
         synchronized (flatFile) {
             int batchSize = maxCount * storeConfig.getReadAheadMinFactor();
             resultFuture = getMessageFromTieredStoreAsync(flatFile, 
queueOffset, batchSize)
-                .thenApplyAsync(result -> {
+                .thenApply(result -> {
                     if (result.getStatus() != GetMessageStatus.FOUND) {
                         return result;
                     }
-                    GetMessageResult newResult = new GetMessageResult();
-                    newResult.setStatus(GetMessageStatus.FOUND);
-                    
newResult.setMinOffset(flatFile.getConsumeQueueMinOffset());
-                    
newResult.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
 
+                    GetMessageResultExt newResult = new GetMessageResultExt();
                     List<Long> offsetList = result.getMessageQueueOffset();
+                    List<Long> tagCodeList = result.getTagCodeList();
                     List<SelectMappedBufferResult> msgList = 
result.getMessageMapedList();
-                    Long minOffset = offsetList.get(0);
-                    Long maxOffset = offsetList.get(offsetList.size() - 1);
-                    int size = offsetList.size();
+
                     for (int i = 0; i < offsetList.size(); i++) {
-                        Long offset = offsetList.get(i);
                         SelectMappedBufferResult msg = msgList.get(i);
-                        // put message into cache
-                        SelectMappedBufferResultWrapper resultWrapper = 
putMessageToCache(flatFile, offset, msg, minOffset, maxOffset, size, true);
-                        // try to meet maxCount
+                        SelectBufferResultWrapper bufferResult = new 
SelectBufferResultWrapper(
+                            msg, offsetList.get(i), tagCodeList.get(i), true);
+                        this.putMessageToCache(flatFile, bufferResult);
                         if (newResult.getMessageMapedList().size() < maxCount) 
{
-                            
newResult.addMessage(resultWrapper.getDuplicateResult(), offset);
+                            newResult.addMessageExt(msg, offsetList.get(i), 
tagCodeList.get(i));
                         }
                     }
+
+                    newResult.setStatus(GetMessageStatus.FOUND);
+                    
newResult.setMinOffset(flatFile.getConsumeQueueMinOffset());
+                    
newResult.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
                     newResult.setNextBeginOffset(queueOffset + 
newResult.getMessageMapedList().size());
                     return newResult;
-                }, TieredStoreExecutor.fetchDataExecutor);
+                });
 
             List<Pair<Integer, CompletableFuture<Long>>> futureList = new 
ArrayList<>();
             CompletableFuture<Long> inflightRequestFuture = 
resultFuture.thenApply(result ->
-                result.getStatus() == GetMessageStatus.FOUND ? 
result.getMessageQueueOffset().get(result.getMessageQueueOffset().size() - 1) : 
-1L);
+                result.getStatus() == GetMessageStatus.FOUND ?
+                    
result.getMessageQueueOffset().get(result.getMessageQueueOffset().size() - 1) : 
-1L);

Review Comment:
   
![image](https://github.com/apache/rocketmq/assets/22487634/0d57a746-dcd2-4b09-83d5-370ee43aa8e0)
   This may not be concise enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to