This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit dcaaab04c038f8ed520e07e363fe0bf980e3349f
Author: Benoit TELLIER <[email protected]>
AuthorDate: Tue Jan 27 16:01:26 2026 +0100

    [ENHANCEMENT] Metrics and logs for FETCH local cache
---
 .../james/imap/processor/fetch/FetchProcessor.java | 53 ++++++++++++++++------
 1 file changed, 39 insertions(+), 14 deletions(-)

diff --git 
a/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java
 
b/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java
index b785b6b6ac..2d1d8de2cd 100644
--- 
a/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java
+++ 
b/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java
@@ -65,6 +65,7 @@ import org.apache.james.mailbox.model.FetchGroup;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MessageRange;
 import org.apache.james.mailbox.model.MessageResult;
+import org.apache.james.metrics.api.Metric;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.util.AuditTrail;
 import org.apache.james.util.DurationParser;
@@ -235,12 +236,19 @@ public class FetchProcessor extends 
AbstractMailboxProcessor<FetchRequest> {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(FetchProcessor.class);
 
     private final LocalCacheConfiguration localCacheConfiguration;
+    private final Metric localCacheHitMetric;
+    private final Metric localCacheMissMetric;
 
     @Inject
     public FetchProcessor(MailboxManager mailboxManager, StatusResponseFactory 
factory,
                           MetricFactory metricFactory, LocalCacheConfiguration 
localCacheConfiguration) {
         super(FetchRequest.class, mailboxManager, factory, metricFactory);
         this.localCacheConfiguration = localCacheConfiguration;
+
+        this.localCacheHitMetric = 
metricFactory.generate("imap-fetch-local-cache-hit");
+        this.localCacheMissMetric = 
metricFactory.generate("imap-fetch-local-cache-miss");
+
+        LOGGER.info("IMAP Fetch local cache configuration {}", 
localCacheConfiguration);
     }
 
     @Override
@@ -352,27 +360,44 @@ public class FetchProcessor extends 
AbstractMailboxProcessor<FetchRequest> {
             boolean singleMessage = ranges.size() == 1 && 
ranges.getFirst().getUidFrom().equals(ranges.getFirst().getUidTo());
             boolean shouldCache = 
fetch.getBodyElements().stream().anyMatch(bodyFetchElement -> 
bodyFetchElement.getNumberOfOctets() != null);
             if (singleMessage && shouldCache) {
-                DefaultLocalMessageCache localMessageCache = new 
DefaultLocalMessageCache(imapSession, localCacheConfiguration);
-                localMessageCache.lookupInCache(mailbox.getId(), 
ranges.getFirst().getUidFrom(), resultToFetch)
-                    .map(Flux::just)
-                    .orElseGet(() -> 
Flux.from(mailbox.getMessagesReactive(ranges.getFirst(), resultToFetch, 
mailboxSession))
-                        .doOnNext(message -> 
localMessageCache.saveForLater(message, resultToFetch)))
-                    .filter(ids -> !fetch.contains(Item.MODSEQ) || 
ids.getModSeq().asLong() > fetch.getChangedSince())
-                    .concatMap(result -> toResponse(mailbox, fetch, 
mailboxSession, selected, result))
-                    .subscribe(fetchSubscriber);
+                publishMessagesWithCache(selected, mailbox, ranges, fetch, 
mailboxSession, imapSession, resultToFetch, fetchSubscriber);
             } else {
-                Flux.fromIterable(consolidate(selected, ranges, fetch))
-                    .flatMap(range -> ReactorUtils.logAsMono(() -> 
auditTrail(mailbox, mailboxSession, resultToFetch, range)).thenReturn(range))
-                    .concatMap(range -> 
Flux.from(mailbox.getMessagesReactive(range, resultToFetch, mailboxSession)))
-                    .filter(ids -> !fetch.contains(Item.MODSEQ) || 
ids.getModSeq().asLong() > fetch.getChangedSince())
-                    .concatMap(result -> toResponse(mailbox, fetch, 
mailboxSession, selected, result))
-                    .subscribe(fetchSubscriber);
+                publishMessagesWithoutCache(selected, mailbox, ranges, fetch, 
mailboxSession, resultToFetch, fetchSubscriber);
             }
 
             return fetchSubscriber.completionMono();
         }
     }
 
+    private void publishMessagesWithCache(SelectedMailbox selected, 
MessageManager mailbox, List<MessageRange> ranges, FetchData fetch, 
MailboxSession mailboxSession, ImapSession imapSession, FetchGroup 
resultToFetch, FetchSubscriber fetchSubscriber) {
+        DefaultLocalMessageCache localMessageCache = new 
DefaultLocalMessageCache(imapSession, localCacheConfiguration);
+        localMessageCache.lookupInCache(mailbox.getId(), 
ranges.getFirst().getUidFrom(), resultToFetch)
+            .map(result -> {
+                localCacheHitMetric.increment();
+                return Flux.just(result).doOnEach(ReactorUtils.logFinally(() 
-> LOGGER.debug("Cached partial fetch reused")));
+            })
+            .orElseGet(() -> ReactorUtils.logAsMono(() -> auditTrail(mailbox, 
mailboxSession, resultToFetch, ranges.getFirst()))
+                
.thenMany(Flux.from(mailbox.getMessagesReactive(ranges.getFirst(), 
resultToFetch, mailboxSession))
+                    .doOnNext(message ->  {
+                        if (localCacheConfiguration.enabled) {
+                            localCacheMissMetric.increment();
+                            localMessageCache.saveForLater(message, 
resultToFetch);
+                        }
+                    })))
+            .filter(ids -> !fetch.contains(Item.MODSEQ) || 
ids.getModSeq().asLong() > fetch.getChangedSince())
+            .concatMap(result -> toResponse(mailbox, fetch, mailboxSession, 
selected, result))
+            .subscribe(fetchSubscriber);
+    }
+
+    private void publishMessagesWithoutCache(SelectedMailbox selected, 
MessageManager mailbox, List<MessageRange> ranges, FetchData fetch, 
MailboxSession mailboxSession, FetchGroup resultToFetch, FetchSubscriber 
fetchSubscriber) {
+        Flux.fromIterable(consolidate(selected, ranges, fetch))
+            .flatMap(range -> ReactorUtils.logAsMono(() -> auditTrail(mailbox, 
mailboxSession, resultToFetch, range)).thenReturn(range))
+            .concatMap(range -> Flux.from(mailbox.getMessagesReactive(range, 
resultToFetch, mailboxSession)))
+            .filter(ids -> !fetch.contains(Item.MODSEQ) || 
ids.getModSeq().asLong() > fetch.getChangedSince())
+            .concatMap(result -> toResponse(mailbox, fetch, mailboxSession, 
selected, result))
+            .subscribe(fetchSubscriber);
+    }
+
     List<MessageRange> consolidate(SelectedMailbox selected, 
List<MessageRange> ranges, FetchData fetchData) {
         if (fetchData.getPartialRange().isEmpty()) {
             return ranges;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to