This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 26e1e60ca4d01201afd70a7c52872b15607005c6
Author: Benoit TELLIER <[email protected]>
AuthorDate: Tue Jan 27 12:20:14 2026 +0100

    [ENHANCEMENT] Plug IMAP audit trail onto the MDC
---
 .../org/apache/james/mailbox/store/StoreMailboxManager.java    |  9 +++++----
 .../james/imap/processor/AbstractMessageRangeProcessor.java    |  4 ++--
 .../java/org/apache/james/imap/processor/DeleteProcessor.java  |  2 +-
 .../java/org/apache/james/imap/processor/ExpungeProcessor.java |  5 +++--
 .../org/apache/james/imap/processor/fetch/FetchProcessor.java  |  2 +-
 .../util/src/main/java/org/apache/james/util/ReactorUtils.java |  8 ++++++--
 .../src/test/java/org/apache/james/util/ReactorUtilsTest.java  | 10 +++-------
 7 files changed, 21 insertions(+), 19 deletions(-)

diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
index 16bf7c12cd..b4a92f45e7 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
@@ -89,6 +89,7 @@ import org.apache.james.mailbox.store.user.SubscriptionMapper;
 import org.apache.james.mailbox.store.user.model.Subscription;
 import org.apache.james.util.AuditTrail;
 import org.apache.james.util.FunctionalUtils;
+import org.apache.james.util.ReactorUtils;
 import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -380,14 +381,14 @@ public class StoreMailboxManager implements 
MailboxManager {
                         .modifyErrorFilter(old -> old.and(e -> !(e instanceof 
MailboxException)))
                         .jitter(0.5)
                         .maxBackoff(Duration.ofSeconds(1)))
-                    .doOnSuccess(mailboxId -> AuditTrail.entry()
+                    .flatMap(mailboxId -> ReactorUtils.logAsMono(() -> 
AuditTrail.entry()
                         .username(() -> 
sanitizedMailboxPath.getUser().asString())
                         .sessionId(() -> 
String.valueOf(mailboxSession.getSessionId().getValue()))
                         .protocol("mailbox")
                         .action("create")
                         .parameters(Throwing.supplier(() -> 
ImmutableMap.of("mailboxId", mailboxId.serialize(),
                             "mailboxPath", sanitizedMailboxPath.asString())))
-                        .log("Mailbox Create"));
+                        .log("Mailbox Create")).thenReturn(mailboxId));
             } catch (MailboxNameException e) {
                 return Mono.error(e);
             }
@@ -682,7 +683,7 @@ public class StoreMailboxManager implements MailboxManager {
 
         return mapper.executeReactive(fromMailboxPublisher
             .flatMap(mailbox -> doRenameMailbox(mailbox, to, fromSession, 
toSession, mapper)
-                .doOnSuccess(any -> AuditTrail.entry()
+                .doOnEach(ReactorUtils.logFinally(() -> AuditTrail.entry()
                     .username(() -> fromSession.getUser().asString())
                     .sessionId(() -> 
String.valueOf(fromSession.getSessionId().getValue()))
                     .protocol("mailbox")
@@ -690,7 +691,7 @@ public class StoreMailboxManager implements MailboxManager {
                     .parameters(Throwing.supplier(() -> 
ImmutableMap.of("mailboxId", mailbox.getMailboxId().serialize(),
                         "fromMailboxPath", 
mailbox.generateAssociatedPath().asString(),
                         "toMailboxPath", to.asString())))
-                    .log("Mailbox Rename"))
+                    .log("Mailbox Rename")))
                 .flatMap(renamedResults -> 
renameSubscriptionsIfNeeded(renamedResults, option, fromSession, toSession))));
     }
 
diff --git 
a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMessageRangeProcessor.java
 
b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMessageRangeProcessor.java
index d84f430ece..0c8aa5b8e8 100644
--- 
a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMessageRangeProcessor.java
+++ 
b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMessageRangeProcessor.java
@@ -121,7 +121,7 @@ public abstract class AbstractMessageRangeProcessor<R 
extends AbstractMessageRan
                             .sneakyThrow())
                         .filter(Objects::nonNull)
                         .concatMap(range -> process(target.getId(), 
session.getSelected(), mailboxSession, range)
-                            .doOnNext(result -> AuditTrail.entry()
+                            .doOnEach(ReactorUtils.logFinally(() -> 
AuditTrail.entry()
                                 .username(() -> 
mailboxSession.getUser().asString())
                                 .sessionId(() -> 
session.sessionId().asString())
                                 .protocol("IMAP")
@@ -130,7 +130,7 @@ public abstract class AbstractMessageRangeProcessor<R 
extends AbstractMessageRan
                                     "targetId", target.getId().serialize(),
                                     "selectedMailboxId", 
session.getSelected().getMailboxId().serialize(),
                                     "range", range.getUidFrom().asLong() + ":" 
+ range.getUidTo().asLong()))
-                                .log("IMAP " + getOperationName() + " 
succeeded."))
+                                .log("IMAP " + getOperationName() + " 
succeeded.")))
                             .map(IdRange::from))
                         .collect(ImmutableList.<IdRange>toImmutableList())
                         .map(IdRange::mergeRanges)
diff --git 
a/protocols/imap/src/main/java/org/apache/james/imap/processor/DeleteProcessor.java
 
b/protocols/imap/src/main/java/org/apache/james/imap/processor/DeleteProcessor.java
index acfef263e2..60f0258d0f 100644
--- 
a/protocols/imap/src/main/java/org/apache/james/imap/processor/DeleteProcessor.java
+++ 
b/protocols/imap/src/main/java/org/apache/james/imap/processor/DeleteProcessor.java
@@ -72,7 +72,7 @@ public class DeleteProcessor extends 
AbstractMailboxProcessor<DeleteRequest> {
             .then(unsolicitedResponses(session, responder, false))
             .then(Mono.fromRunnable(() -> okComplete(request, responder)))
             .then()
-            .doOnSuccess(any -> auditTrail(session, mailboxPath))
+            .doOnEach(ReactorUtils.logFinally(() -> auditTrail(session, 
mailboxPath)))
             .onErrorResume(MailboxNotFoundException.class, e -> {
                 no(request, responder, 
HumanReadableText.FAILURE_NO_SUCH_MAILBOX);
                 return ReactorUtils.logAsMono(() -> LOGGER.debug("Delete 
failed for mailbox {} as it doesn't exist", mailboxPath, e));
diff --git 
a/protocols/imap/src/main/java/org/apache/james/imap/processor/ExpungeProcessor.java
 
b/protocols/imap/src/main/java/org/apache/james/imap/processor/ExpungeProcessor.java
index 3e51eb874e..abdcb4cf3d 100644
--- 
a/protocols/imap/src/main/java/org/apache/james/imap/processor/ExpungeProcessor.java
+++ 
b/protocols/imap/src/main/java/org/apache/james/imap/processor/ExpungeProcessor.java
@@ -129,7 +129,7 @@ public class ExpungeProcessor extends 
AbstractMailboxProcessor<ExpungeRequest> i
             .doOnNext(selected::removeRecent)
             .count()
             .map(Long::intValue)
-            .doOnSuccess(count -> AuditTrail.entry()
+            .flatMap(count -> ReactorUtils.logAsMono(() -> AuditTrail.entry()
                 .username(() -> mailboxSession.getUser().asString())
                 .sessionId(() -> session.sessionId().asString())
                 .protocol("IMAP")
@@ -137,7 +137,8 @@ public class ExpungeProcessor extends 
AbstractMailboxProcessor<ExpungeRequest> i
                 .parameters(() -> ImmutableMap.of("loggedInUser", 
mailboxSession.getLoggedInUser().map(Username::asString).orElse(""),
                     "mailboxId", mailbox.getId().serialize(),
                     "messageUids", range.toString()))
-                .log(String.format("IMAP EXPUNGE succeeded. %d message 
deleted.", count)));
+                .log(String.format("IMAP EXPUNGE succeeded. %d message 
deleted.", count)))
+                .thenReturn(count));
     }
 
     @Override
diff --git 
a/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java
 
b/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java
index 0ab9930ebb..b785b6b6ac 100644
--- 
a/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java
+++ 
b/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java
@@ -362,7 +362,7 @@ public class FetchProcessor extends 
AbstractMailboxProcessor<FetchRequest> {
                     .subscribe(fetchSubscriber);
             } else {
                 Flux.fromIterable(consolidate(selected, ranges, fetch))
-                    .doOnNext(range -> auditTrail(mailbox, mailboxSession, 
resultToFetch, range))
+                    .flatMap(range -> ReactorUtils.logAsMono(() -> 
auditTrail(mailbox, mailboxSession, resultToFetch, range)).thenReturn(range))
                     .concatMap(range -> 
Flux.from(mailbox.getMessagesReactive(range, resultToFetch, mailboxSession)))
                     .filter(ids -> !fetch.contains(Item.MODSEQ) || 
ids.getModSeq().asLong() > fetch.getChangedSince())
                     .concatMap(result -> toResponse(mailbox, fetch, 
mailboxSession, selected, result))
diff --git 
a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java 
b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
index 8c26be6382..0f69d237cf 100644
--- 
a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
+++ 
b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
@@ -190,8 +190,12 @@ public class ReactorUtils {
         };
     }
 
-    public static Consumer<Signal<?>> log(Runnable logStatement) {
-        return signal -> logWithContext(logStatement, signal.getContextView());
+    public static Consumer<Signal<?>> logFinally(Runnable logStatement) {
+        return signal -> {
+            if (signal.isOnComplete()) {
+                logWithContext(logStatement, signal.getContextView());
+            }
+        };
     }
 
     private static void logWithContext(Runnable logStatement, ContextView 
contextView) {
diff --git 
a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
 
b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
index ca5a1c3e28..bd4693b18e 100644
--- 
a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
+++ 
b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
@@ -746,9 +746,7 @@ class ReactorUtilsTest {
             String key = "key";
 
             Flux.just(1)
-                .doOnEach(ReactorUtils.log(() -> {
-                    assertThat(MDC.get(key)).isEqualTo(value);
-                }))
+                .doOnEach(ReactorUtils.logFinally(() -> 
assertThat(MDC.get(key)).isEqualTo(value)))
                 .contextWrite(ReactorUtils.context("test", 
MDCBuilder.ofValue(key, value)))
                 .blockLast();
         }
@@ -760,9 +758,7 @@ class ReactorUtilsTest {
             String key = "key";
 
             Flux.just(1)
-                .doOnEach(ReactorUtils.log(() -> {
-                    assertThat(MDC.get(key)).isEqualTo(value1);
-                }))
+                .doOnEach(ReactorUtils.logFinally(() -> 
assertThat(MDC.get(key)).isEqualTo(value1)))
                 .contextWrite(ReactorUtils.context("test", 
MDCBuilder.ofValue(key, value1)))
                 .contextWrite(ReactorUtils.context("test", 
MDCBuilder.ofValue(key, value2)))
                 .blockLast();
@@ -776,7 +772,7 @@ class ReactorUtilsTest {
             String key2 = "key2";
 
             Flux.just(1)
-                .doOnEach(ReactorUtils.log(() -> {
+                .doOnEach(ReactorUtils.logFinally(() -> {
                     assertThat(MDC.get(key1)).isEqualTo(value1);
                     assertThat(MDC.get(key2)).isEqualTo(value2);
                 }))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to