This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit dcaaab04c038f8ed520e07e363fe0bf980e3349f Author: Benoit TELLIER <[email protected]> AuthorDate: Tue Jan 27 16:01:26 2026 +0100 [ENHANCEMENT] Metrics and logs for FETCH local cache --- .../james/imap/processor/fetch/FetchProcessor.java | 53 ++++++++++++++++------ 1 file changed, 39 insertions(+), 14 deletions(-) diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java index b785b6b6ac..2d1d8de2cd 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java @@ -65,6 +65,7 @@ import org.apache.james.mailbox.model.FetchGroup; import org.apache.james.mailbox.model.MailboxId; import org.apache.james.mailbox.model.MessageRange; import org.apache.james.mailbox.model.MessageResult; +import org.apache.james.metrics.api.Metric; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.util.AuditTrail; import org.apache.james.util.DurationParser; @@ -235,12 +236,19 @@ public class FetchProcessor extends AbstractMailboxProcessor<FetchRequest> { private static final Logger LOGGER = LoggerFactory.getLogger(FetchProcessor.class); private final LocalCacheConfiguration localCacheConfiguration; + private final Metric localCacheHitMetric; + private final Metric localCacheMissMetric; @Inject public FetchProcessor(MailboxManager mailboxManager, StatusResponseFactory factory, MetricFactory metricFactory, LocalCacheConfiguration localCacheConfiguration) { super(FetchRequest.class, mailboxManager, factory, metricFactory); this.localCacheConfiguration = localCacheConfiguration; + + this.localCacheHitMetric = metricFactory.generate("imap-fetch-local-cache-hit"); + this.localCacheMissMetric = metricFactory.generate("imap-fetch-local-cache-miss"); + + LOGGER.info("IMAP Fetch local cache configuration {}", localCacheConfiguration); } @Override @@ -352,27 +360,44 @@ public class FetchProcessor extends AbstractMailboxProcessor<FetchRequest> { boolean singleMessage = ranges.size() == 1 && ranges.getFirst().getUidFrom().equals(ranges.getFirst().getUidTo()); boolean shouldCache = fetch.getBodyElements().stream().anyMatch(bodyFetchElement -> bodyFetchElement.getNumberOfOctets() != null); if (singleMessage && shouldCache) { - DefaultLocalMessageCache localMessageCache = new DefaultLocalMessageCache(imapSession, localCacheConfiguration); - localMessageCache.lookupInCache(mailbox.getId(), ranges.getFirst().getUidFrom(), resultToFetch) - .map(Flux::just) - .orElseGet(() -> Flux.from(mailbox.getMessagesReactive(ranges.getFirst(), resultToFetch, mailboxSession)) - .doOnNext(message -> localMessageCache.saveForLater(message, resultToFetch))) - .filter(ids -> !fetch.contains(Item.MODSEQ) || ids.getModSeq().asLong() > fetch.getChangedSince()) - .concatMap(result -> toResponse(mailbox, fetch, mailboxSession, selected, result)) - .subscribe(fetchSubscriber); + publishMessagesWithCache(selected, mailbox, ranges, fetch, mailboxSession, imapSession, resultToFetch, fetchSubscriber); } else { - Flux.fromIterable(consolidate(selected, ranges, fetch)) - .flatMap(range -> ReactorUtils.logAsMono(() -> auditTrail(mailbox, mailboxSession, resultToFetch, range)).thenReturn(range)) - .concatMap(range -> Flux.from(mailbox.getMessagesReactive(range, resultToFetch, mailboxSession))) - .filter(ids -> !fetch.contains(Item.MODSEQ) || ids.getModSeq().asLong() > fetch.getChangedSince()) - .concatMap(result -> toResponse(mailbox, fetch, mailboxSession, selected, result)) - .subscribe(fetchSubscriber); + publishMessagesWithoutCache(selected, mailbox, ranges, fetch, mailboxSession, resultToFetch, fetchSubscriber); } return fetchSubscriber.completionMono(); } } + private void publishMessagesWithCache(SelectedMailbox selected, MessageManager mailbox, List<MessageRange> ranges, FetchData fetch, MailboxSession mailboxSession, ImapSession imapSession, FetchGroup resultToFetch, FetchSubscriber fetchSubscriber) { + DefaultLocalMessageCache localMessageCache = new DefaultLocalMessageCache(imapSession, localCacheConfiguration); + localMessageCache.lookupInCache(mailbox.getId(), ranges.getFirst().getUidFrom(), resultToFetch) + .map(result -> { + localCacheHitMetric.increment(); + return Flux.just(result).doOnEach(ReactorUtils.logFinally(() -> LOGGER.debug("Cached partial fetch reused"))); + }) + .orElseGet(() -> ReactorUtils.logAsMono(() -> auditTrail(mailbox, mailboxSession, resultToFetch, ranges.getFirst())) + .thenMany(Flux.from(mailbox.getMessagesReactive(ranges.getFirst(), resultToFetch, mailboxSession)) + .doOnNext(message -> { + if (localCacheConfiguration.enabled) { + localCacheMissMetric.increment(); + localMessageCache.saveForLater(message, resultToFetch); + } + }))) + .filter(ids -> !fetch.contains(Item.MODSEQ) || ids.getModSeq().asLong() > fetch.getChangedSince()) + .concatMap(result -> toResponse(mailbox, fetch, mailboxSession, selected, result)) + .subscribe(fetchSubscriber); + } + + private void publishMessagesWithoutCache(SelectedMailbox selected, MessageManager mailbox, List<MessageRange> ranges, FetchData fetch, MailboxSession mailboxSession, FetchGroup resultToFetch, FetchSubscriber fetchSubscriber) { + Flux.fromIterable(consolidate(selected, ranges, fetch)) + .flatMap(range -> ReactorUtils.logAsMono(() -> auditTrail(mailbox, mailboxSession, resultToFetch, range)).thenReturn(range)) + .concatMap(range -> Flux.from(mailbox.getMessagesReactive(range, resultToFetch, mailboxSession))) + .filter(ids -> !fetch.contains(Item.MODSEQ) || ids.getModSeq().asLong() > fetch.getChangedSince()) + .concatMap(result -> toResponse(mailbox, fetch, mailboxSession, selected, result)) + .subscribe(fetchSubscriber); + } + List<MessageRange> consolidate(SelectedMailbox selected, List<MessageRange> ranges, FetchData fetchData) { if (fetchData.getPartialRange().isEmpty()) { return ranges; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
