This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 26e1e60ca4d01201afd70a7c52872b15607005c6 Author: Benoit TELLIER <[email protected]> AuthorDate: Tue Jan 27 12:20:14 2026 +0100 [ENHANCEMENT] Plug IMAP audit trail onto the MDC --- .../org/apache/james/mailbox/store/StoreMailboxManager.java | 9 +++++---- .../james/imap/processor/AbstractMessageRangeProcessor.java | 4 ++-- .../java/org/apache/james/imap/processor/DeleteProcessor.java | 2 +- .../java/org/apache/james/imap/processor/ExpungeProcessor.java | 5 +++-- .../org/apache/james/imap/processor/fetch/FetchProcessor.java | 2 +- .../util/src/main/java/org/apache/james/util/ReactorUtils.java | 8 ++++++-- .../src/test/java/org/apache/james/util/ReactorUtilsTest.java | 10 +++------- 7 files changed, 21 insertions(+), 19 deletions(-) diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java index 16bf7c12cd..b4a92f45e7 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java @@ -89,6 +89,7 @@ import org.apache.james.mailbox.store.user.SubscriptionMapper; import org.apache.james.mailbox.store.user.model.Subscription; import org.apache.james.util.AuditTrail; import org.apache.james.util.FunctionalUtils; +import org.apache.james.util.ReactorUtils; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -380,14 +381,14 @@ public class StoreMailboxManager implements MailboxManager { .modifyErrorFilter(old -> old.and(e -> !(e instanceof MailboxException))) .jitter(0.5) .maxBackoff(Duration.ofSeconds(1))) - .doOnSuccess(mailboxId -> AuditTrail.entry() + .flatMap(mailboxId -> ReactorUtils.logAsMono(() -> AuditTrail.entry() .username(() -> sanitizedMailboxPath.getUser().asString()) .sessionId(() -> String.valueOf(mailboxSession.getSessionId().getValue())) .protocol("mailbox") .action("create") .parameters(Throwing.supplier(() -> ImmutableMap.of("mailboxId", mailboxId.serialize(), "mailboxPath", sanitizedMailboxPath.asString()))) - .log("Mailbox Create")); + .log("Mailbox Create")).thenReturn(mailboxId)); } catch (MailboxNameException e) { return Mono.error(e); } @@ -682,7 +683,7 @@ public class StoreMailboxManager implements MailboxManager { return mapper.executeReactive(fromMailboxPublisher .flatMap(mailbox -> doRenameMailbox(mailbox, to, fromSession, toSession, mapper) - .doOnSuccess(any -> AuditTrail.entry() + .doOnEach(ReactorUtils.logFinally(() -> AuditTrail.entry() .username(() -> fromSession.getUser().asString()) .sessionId(() -> String.valueOf(fromSession.getSessionId().getValue())) .protocol("mailbox") @@ -690,7 +691,7 @@ public class StoreMailboxManager implements MailboxManager { .parameters(Throwing.supplier(() -> ImmutableMap.of("mailboxId", mailbox.getMailboxId().serialize(), "fromMailboxPath", mailbox.generateAssociatedPath().asString(), "toMailboxPath", to.asString()))) - .log("Mailbox Rename")) + .log("Mailbox Rename"))) .flatMap(renamedResults -> renameSubscriptionsIfNeeded(renamedResults, option, fromSession, toSession)))); } diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMessageRangeProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMessageRangeProcessor.java index d84f430ece..0c8aa5b8e8 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMessageRangeProcessor.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMessageRangeProcessor.java @@ -121,7 +121,7 @@ public abstract class AbstractMessageRangeProcessor<R extends AbstractMessageRan .sneakyThrow()) .filter(Objects::nonNull) .concatMap(range -> process(target.getId(), session.getSelected(), mailboxSession, range) - .doOnNext(result -> AuditTrail.entry() + .doOnEach(ReactorUtils.logFinally(() -> AuditTrail.entry() .username(() -> mailboxSession.getUser().asString()) .sessionId(() -> session.sessionId().asString()) .protocol("IMAP") @@ -130,7 +130,7 @@ public abstract class AbstractMessageRangeProcessor<R extends AbstractMessageRan "targetId", target.getId().serialize(), "selectedMailboxId", session.getSelected().getMailboxId().serialize(), "range", range.getUidFrom().asLong() + ":" + range.getUidTo().asLong())) - .log("IMAP " + getOperationName() + " succeeded.")) + .log("IMAP " + getOperationName() + " succeeded."))) .map(IdRange::from)) .collect(ImmutableList.<IdRange>toImmutableList()) .map(IdRange::mergeRanges) diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/DeleteProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/DeleteProcessor.java index acfef263e2..60f0258d0f 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/processor/DeleteProcessor.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/DeleteProcessor.java @@ -72,7 +72,7 @@ public class DeleteProcessor extends AbstractMailboxProcessor<DeleteRequest> { .then(unsolicitedResponses(session, responder, false)) .then(Mono.fromRunnable(() -> okComplete(request, responder))) .then() - .doOnSuccess(any -> auditTrail(session, mailboxPath)) + .doOnEach(ReactorUtils.logFinally(() -> auditTrail(session, mailboxPath))) .onErrorResume(MailboxNotFoundException.class, e -> { no(request, responder, HumanReadableText.FAILURE_NO_SUCH_MAILBOX); return ReactorUtils.logAsMono(() -> LOGGER.debug("Delete failed for mailbox {} as it doesn't exist", mailboxPath, e)); diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/ExpungeProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/ExpungeProcessor.java index 3e51eb874e..abdcb4cf3d 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/processor/ExpungeProcessor.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/ExpungeProcessor.java @@ -129,7 +129,7 @@ public class ExpungeProcessor extends AbstractMailboxProcessor<ExpungeRequest> i .doOnNext(selected::removeRecent) .count() .map(Long::intValue) - .doOnSuccess(count -> AuditTrail.entry() + .flatMap(count -> ReactorUtils.logAsMono(() -> AuditTrail.entry() .username(() -> mailboxSession.getUser().asString()) .sessionId(() -> session.sessionId().asString()) .protocol("IMAP") @@ -137,7 +137,8 @@ public class ExpungeProcessor extends AbstractMailboxProcessor<ExpungeRequest> i .parameters(() -> ImmutableMap.of("loggedInUser", mailboxSession.getLoggedInUser().map(Username::asString).orElse(""), "mailboxId", mailbox.getId().serialize(), "messageUids", range.toString())) - .log(String.format("IMAP EXPUNGE succeeded. %d message deleted.", count))); + .log(String.format("IMAP EXPUNGE succeeded. %d message deleted.", count))) + .thenReturn(count)); } @Override diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java index 0ab9930ebb..b785b6b6ac 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/fetch/FetchProcessor.java @@ -362,7 +362,7 @@ public class FetchProcessor extends AbstractMailboxProcessor<FetchRequest> { .subscribe(fetchSubscriber); } else { Flux.fromIterable(consolidate(selected, ranges, fetch)) - .doOnNext(range -> auditTrail(mailbox, mailboxSession, resultToFetch, range)) + .flatMap(range -> ReactorUtils.logAsMono(() -> auditTrail(mailbox, mailboxSession, resultToFetch, range)).thenReturn(range)) .concatMap(range -> Flux.from(mailbox.getMessagesReactive(range, resultToFetch, mailboxSession))) .filter(ids -> !fetch.contains(Item.MODSEQ) || ids.getModSeq().asLong() > fetch.getChangedSince()) .concatMap(result -> toResponse(mailbox, fetch, mailboxSession, selected, result)) diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java index 8c26be6382..0f69d237cf 100644 --- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java +++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java @@ -190,8 +190,12 @@ public class ReactorUtils { }; } - public static Consumer<Signal<?>> log(Runnable logStatement) { - return signal -> logWithContext(logStatement, signal.getContextView()); + public static Consumer<Signal<?>> logFinally(Runnable logStatement) { + return signal -> { + if (signal.isOnComplete()) { + logWithContext(logStatement, signal.getContextView()); + } + }; } private static void logWithContext(Runnable logStatement, ContextView contextView) { diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java index ca5a1c3e28..bd4693b18e 100644 --- a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java +++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java @@ -746,9 +746,7 @@ class ReactorUtilsTest { String key = "key"; Flux.just(1) - .doOnEach(ReactorUtils.log(() -> { - assertThat(MDC.get(key)).isEqualTo(value); - })) + .doOnEach(ReactorUtils.logFinally(() -> assertThat(MDC.get(key)).isEqualTo(value))) .contextWrite(ReactorUtils.context("test", MDCBuilder.ofValue(key, value))) .blockLast(); } @@ -760,9 +758,7 @@ class ReactorUtilsTest { String key = "key"; Flux.just(1) - .doOnEach(ReactorUtils.log(() -> { - assertThat(MDC.get(key)).isEqualTo(value1); - })) + .doOnEach(ReactorUtils.logFinally(() -> assertThat(MDC.get(key)).isEqualTo(value1))) .contextWrite(ReactorUtils.context("test", MDCBuilder.ofValue(key, value1))) .contextWrite(ReactorUtils.context("test", MDCBuilder.ofValue(key, value2))) .blockLast(); @@ -776,7 +772,7 @@ class ReactorUtilsTest { String key2 = "key2"; Flux.just(1) - .doOnEach(ReactorUtils.log(() -> { + .doOnEach(ReactorUtils.logFinally(() -> { assertThat(MDC.get(key1)).isEqualTo(value1); assertThat(MDC.get(key2)).isEqualTo(value2); })) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
