lizhimins commented on code in PR #7594: URL: https://github.com/apache/rocketmq/pull/7594#discussion_r1413454902
########## tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java: ########## @@ -303,74 +270,94 @@ public CompletableFuture<GetMessageResult> getMessageFromCacheAsync(CompositeQue recordCacheAccess(flatFile, group, queueOffset, resultWrapperList); - // if cache hit, result will be returned immediately and asynchronously prefetch messages for later requests - if (!resultWrapperList.isEmpty()) { - LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: cache hit: " + - "topic: {}, queue: {}, queue offset: {}, max message num: {}, cache hit num: {}", - mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, resultWrapperList.size()); - prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1); + if (resultWrapperList.isEmpty()) { + // If cache miss, pull messages immediately + LOGGER.info("MessageFetcher cache miss, group: {}, topic: {}, queueId: {}, offset: {}, maxCount: {}", + group, mq.getTopic(), mq.getQueueId(), queueOffset, maxCount); + } else { + // If cache hit, return buffer result immediately and asynchronously prefetch messages + LOGGER.debug("MessageFetcher cache hit, group: {}, topic: {}, queueId: {}, offset: {}, maxCount: {}, resultSize: {}", + group, mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, resultWrapperList.size()); - GetMessageResult result = new GetMessageResult(); + GetMessageResultExt result = new GetMessageResultExt(); result.setStatus(GetMessageStatus.FOUND); result.setMinOffset(flatFile.getConsumeQueueMinOffset()); result.setMaxOffset(flatFile.getConsumeQueueCommitOffset()); result.setNextBeginOffset(queueOffset + resultWrapperList.size()); - resultWrapperList.forEach(wrapper -> result.addMessage(wrapper.getDuplicateResult(), wrapper.getCurOffset())); + resultWrapperList.forEach(wrapper -> result.addMessageExt( + wrapper.getDuplicateResult(), wrapper.getOffset(), wrapper.getTagCode())); + + if (lastGetOffset < result.getMaxOffset()) { + this.prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1); + } return CompletableFuture.completedFuture(result); } - // if cache is miss, immediately pull messages - LOGGER.info("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: " + - "topic: {}, queue: {}, queue offset: {}, max message num: {}", - mq.getTopic(), mq.getQueueId(), queueOffset, maxCount); - - CompletableFuture<GetMessageResult> resultFuture; + CompletableFuture<GetMessageResultExt> resultFuture; synchronized (flatFile) { int batchSize = maxCount * storeConfig.getReadAheadMinFactor(); resultFuture = getMessageFromTieredStoreAsync(flatFile, queueOffset, batchSize) - .thenApplyAsync(result -> { + .thenApply(result -> { if (result.getStatus() != GetMessageStatus.FOUND) { return result; } - GetMessageResult newResult = new GetMessageResult(); - newResult.setStatus(GetMessageStatus.FOUND); - newResult.setMinOffset(flatFile.getConsumeQueueMinOffset()); - newResult.setMaxOffset(flatFile.getConsumeQueueCommitOffset()); + GetMessageResultExt newResult = new GetMessageResultExt(); List<Long> offsetList = result.getMessageQueueOffset(); + List<Long> tagCodeList = result.getTagCodeList(); List<SelectMappedBufferResult> msgList = result.getMessageMapedList(); - Long minOffset = offsetList.get(0); - Long maxOffset = offsetList.get(offsetList.size() - 1); - int size = offsetList.size(); + for (int i = 0; i < offsetList.size(); i++) { - Long offset = offsetList.get(i); SelectMappedBufferResult msg = msgList.get(i); - // put message into cache - SelectMappedBufferResultWrapper resultWrapper = putMessageToCache(flatFile, offset, msg, minOffset, maxOffset, size, true); - // try to meet maxCount + SelectBufferResultWrapper bufferResult = new SelectBufferResultWrapper( + msg, offsetList.get(i), tagCodeList.get(i), true); + this.putMessageToCache(flatFile, bufferResult); if (newResult.getMessageMapedList().size() < maxCount) { - newResult.addMessage(resultWrapper.getDuplicateResult(), offset); + newResult.addMessageExt(msg, offsetList.get(i), tagCodeList.get(i)); } } + + newResult.setStatus(GetMessageStatus.FOUND); + newResult.setMinOffset(flatFile.getConsumeQueueMinOffset()); + newResult.setMaxOffset(flatFile.getConsumeQueueCommitOffset()); newResult.setNextBeginOffset(queueOffset + newResult.getMessageMapedList().size()); return newResult; - }, TieredStoreExecutor.fetchDataExecutor); + }); List<Pair<Integer, CompletableFuture<Long>>> futureList = new ArrayList<>(); CompletableFuture<Long> inflightRequestFuture = resultFuture.thenApply(result -> - result.getStatus() == GetMessageStatus.FOUND ? result.getMessageQueueOffset().get(result.getMessageQueueOffset().size() - 1) : -1L); + result.getStatus() == GetMessageStatus.FOUND ? + result.getMessageQueueOffset().get(result.getMessageQueueOffset().size() - 1) : -1L); Review Comment: ![image](https://github.com/apache/rocketmq/assets/22487634/0d57a746-dcd2-4b09-83d5-370ee43aa8e0) This may not be concise enough. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org