This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 2c53f0130cca2d5fd3aa5fde08f61d5a261abb81
Author: Benoit Tellier <btell...@linagora.com>
AuthorDate: Fri Jun 2 17:07:37 2023 +0700

    [FIX] JPA: avoid leaks in entity manager
---
 .../listeners/SetCustomFlagOnBigMessages.java      |  1 +
 .../james/mailbox/backup/DefaultMailboxBackup.java | 16 +++++----
 .../mailbox/backup/ZipMailArchiveRestorer.java     |  1 +
 .../CassandraMailboxSessionMapperFactory.java      |  5 +++
 ...CassandraRecomputeCurrentQuotasServiceTest.java |  2 +-
 .../task/JPARecomputeCurrentQuotasServiceTest.java |  2 +-
 .../MemoryRecomputeCurrentQuotasServiceTest.java   |  4 ++-
 .../mailbox/store/SystemMailboxesProviderImpl.java |  3 +-
 .../store/event/MailboxAnnotationListener.java     |  3 +-
 .../store/quota/DefaultUserQuotaRootResolver.java  |  6 ++--
 .../store/search/ListeningMessageSearchIndex.java  |  6 ++--
 .../store/event/MailboxAnnotationListenerTest.java |  3 ++
 .../mailbox/tools/indexer/ReIndexerImpl.java       |  1 +
 .../mailbox/tools/indexer/ReIndexerPerformer.java  | 18 ++++++----
 .../quota/task/RecomputeCurrentQuotasService.java  |  9 +++--
 .../james/healthcheck/MailReceptionCheck.java      |  3 +-
 .../adapter/mailbox/ACLUsernameChangeTaskStep.java |  7 ++--
 .../mailbox/MailboxUserDeletionTaskStep.java       |  6 ++--
 .../mailbox/MailboxUsernameChangeTaskStep.java     | 16 ++++++---
 .../james/transport/mailets/RandomStoring.java     |  3 +-
 .../jmap/draft/send/PostDequeueDecorator.java      |  1 +
 .../james/jmap/mailet/filter/ActionApplier.java    |  1 +
 .../webadmin/vault/routes/RestoreService.java      |  3 +-
 .../webadmin/service/CreateMissingParentsTask.java | 15 ++++----
 .../webadmin/service/ExpireMailboxService.java     |  3 +-
 .../webadmin/service/UserMailboxesService.java     | 41 ++++++++++++++++------
 .../org/apache/james/rspamd/RspamdListener.java    | 10 ++++--
 .../rspamd/task/GetMailboxMessagesService.java     |  6 ++--
 .../james/spamassassin/SpamAssassinListener.java   |  2 ++
 29 files changed, 139 insertions(+), 58 deletions(-)

diff --git 
a/examples/custom-listeners/src/main/java/org/apache/james/examples/custom/listeners/SetCustomFlagOnBigMessages.java
 
b/examples/custom-listeners/src/main/java/org/apache/james/examples/custom/listeners/SetCustomFlagOnBigMessages.java
index d7223c1d6b..192e6df1a0 100644
--- 
a/examples/custom-listeners/src/main/java/org/apache/james/examples/custom/listeners/SetCustomFlagOnBigMessages.java
+++ 
b/examples/custom-listeners/src/main/java/org/apache/james/examples/custom/listeners/SetCustomFlagOnBigMessages.java
@@ -86,6 +86,7 @@ class SetCustomFlagOnBigMessages implements 
EventListener.GroupEventListener {
                 FlagsUpdateMode.ADD,
                 MessageRange.one(messageUid),
                 session);
+            mailboxManager.endProcessingRequest(session);
         } catch (MailboxException e) {
             LOGGER.error("error happens when adding '{}' flag to the message 
with uid {} in mailbox {} of user {}",
                 BIG_MESSAGE, messageUid.asLong(), addedEvent.getMailboxId(), 
addedEvent.getUsername().asString(), e);
diff --git 
a/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/DefaultMailboxBackup.java
 
b/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/DefaultMailboxBackup.java
index 0c9fd32bcd..bc6697a88a 100644
--- 
a/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/DefaultMailboxBackup.java
+++ 
b/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/DefaultMailboxBackup.java
@@ -32,7 +32,6 @@ import org.apache.james.core.Username;
 import org.apache.james.mailbox.MailboxManager;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.MessageManager;
-import org.apache.james.mailbox.exception.BadCredentialsException;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.FetchGroup;
 import org.apache.james.mailbox.model.Mailbox;
@@ -98,14 +97,19 @@ public class DefaultMailboxBackup implements MailboxBackup {
 
         Stream<MessageResult> messages = allMessagesForUser(accountContents);
         archive(mailboxes, messages, destination);
+        mailboxManager.endProcessingRequest(session);
     }
 
-    private boolean isAccountNonEmpty(Username username) throws 
BadCredentialsException, MailboxException, IOException {
+    private boolean isAccountNonEmpty(Username username) throws 
MailboxException {
         MailboxSession session = mailboxManager.createSystemSession(username);
-        return getAccountContentForUser(session)
-            .stream()
-            .findFirst()
-            .isPresent();
+        try {
+            return getAccountContentForUser(session)
+                .stream()
+                .findFirst()
+                .isPresent();
+        } finally {
+            mailboxManager.endProcessingRequest(session);
+        }
     }
 
     @Override
diff --git 
a/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/ZipMailArchiveRestorer.java
 
b/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/ZipMailArchiveRestorer.java
index 5830c69019..215f7b19bf 100644
--- 
a/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/ZipMailArchiveRestorer.java
+++ 
b/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/ZipMailArchiveRestorer.java
@@ -58,6 +58,7 @@ public class ZipMailArchiveRestorer implements 
MailArchiveRestorer {
     public void restore(Username username, InputStream source) throws 
MailboxException, IOException {
         MailboxSession session = mailboxManager.createSystemSession(username);
         restoreEntries(source, session);
+        mailboxManager.endProcessingRequest(session);
     }
 
     private void restoreEntries(InputStream source, MailboxSession session) 
throws IOException {
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
index 84d86b1882..fb3069869b 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
@@ -231,6 +231,11 @@ public class CassandraMailboxSessionMapperFactory extends 
MailboxSessionMapperFa
         return cassandraSubscriptionMapper;
     }
 
+    @Override
+    public void endProcessingRequest(MailboxSession session) {
+
+    }
+
     public DeleteMessageListener deleteMessageListener() {
         return new DeleteMessageListener(threadDAO, threadLookupDAO, 
imapUidDAO, messageIdDAO, messageDAO, messageDAOV3, attachmentDAOV2,
             attachmentMessageIdDAO, aclMapper, userMailboxRightsDAO, 
applicableFlagDAO, firstUnseenDAO, deletedMessageDAO,
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/CassandraRecomputeCurrentQuotasServiceTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/CassandraRecomputeCurrentQuotasServiceTest.java
index 113b741ed4..9b5d3b628a 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/CassandraRecomputeCurrentQuotasServiceTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/CassandraRecomputeCurrentQuotasServiceTest.java
@@ -86,7 +86,7 @@ public class CassandraRecomputeCurrentQuotasServiceTest 
implements RecomputeCurr
         userQuotaRootResolver = new 
DefaultUserQuotaRootResolver(sessionProvider, mapperFactory);
         CurrentQuotaCalculator currentQuotaCalculator = new 
CurrentQuotaCalculator(mapperFactory, userQuotaRootResolver);
 
-        testee = new RecomputeCurrentQuotasService(usersRepository, 
currentQuotaManager, currentQuotaCalculator, userQuotaRootResolver, 
sessionProvider);
+        testee = new RecomputeCurrentQuotasService(usersRepository, 
currentQuotaManager, currentQuotaCalculator, userQuotaRootResolver, 
sessionProvider, mailboxManager);
     }
 
     @Override
diff --git 
a/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/task/JPARecomputeCurrentQuotasServiceTest.java
 
b/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/task/JPARecomputeCurrentQuotasServiceTest.java
index 5d5dfbf696..43925070a6 100644
--- 
a/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/task/JPARecomputeCurrentQuotasServiceTest.java
+++ 
b/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/task/JPARecomputeCurrentQuotasServiceTest.java
@@ -87,7 +87,7 @@ class JPARecomputeCurrentQuotasServiceTest implements 
RecomputeCurrentQuotasServ
 
         CurrentQuotaCalculator currentQuotaCalculator = new 
CurrentQuotaCalculator(mapperFactory, userQuotaRootResolver);
 
-        testee = new RecomputeCurrentQuotasService(usersRepository, 
currentQuotaManager, currentQuotaCalculator, userQuotaRootResolver, 
sessionProvider);
+        testee = new RecomputeCurrentQuotasService(usersRepository, 
currentQuotaManager, currentQuotaCalculator, userQuotaRootResolver, 
sessionProvider, mailboxManager);
     }
 
     @AfterEach
diff --git 
a/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/mail/task/MemoryRecomputeCurrentQuotasServiceTest.java
 
b/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/mail/task/MemoryRecomputeCurrentQuotasServiceTest.java
index 328fccb798..2fa06bde62 100644
--- 
a/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/mail/task/MemoryRecomputeCurrentQuotasServiceTest.java
+++ 
b/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/mail/task/MemoryRecomputeCurrentQuotasServiceTest.java
@@ -26,6 +26,7 @@ import 
org.apache.james.domainlist.lib.DomainListConfiguration;
 import org.apache.james.domainlist.memory.MemoryDomainList;
 import org.apache.james.mailbox.MailboxManager;
 import org.apache.james.mailbox.SessionProvider;
+import org.apache.james.mailbox.inmemory.InMemoryMailboxManager;
 import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources;
 import org.apache.james.mailbox.quota.CurrentQuotaManager;
 import org.apache.james.mailbox.quota.UserQuotaRootResolver;
@@ -52,7 +53,8 @@ class MemoryRecomputeCurrentQuotasServiceTest implements 
RecomputeCurrentQuotasS
         usersRepository = 
MemoryUsersRepository.withoutVirtualHosting(memoryDomainList);
 
         resources = InMemoryIntegrationResources.defaultResources();
-        testee = new RecomputeCurrentQuotasService(usersRepository, 
resources.getCurrentQuotaManager(), resources.getCurrentQuotaCalculator(), 
resources.getDefaultUserQuotaRootResolver(), 
resources.getMailboxManager().getSessionProvider());
+        InMemoryMailboxManager mailboxManager = resources.getMailboxManager();
+        testee = new RecomputeCurrentQuotasService(usersRepository, 
resources.getCurrentQuotaManager(), resources.getCurrentQuotaCalculator(), 
resources.getDefaultUserQuotaRootResolver(), 
mailboxManager.getSessionProvider(), mailboxManager);
     }
 
     @Override
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/SystemMailboxesProviderImpl.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/SystemMailboxesProviderImpl.java
index 3b6a2fbd7c..a1518902b1 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/SystemMailboxesProviderImpl.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/SystemMailboxesProviderImpl.java
@@ -57,7 +57,8 @@ public class SystemMailboxesProviderImpl implements 
SystemMailboxesProvider {
 
         return Mono.from(mailboxManager.getMailboxReactive(mailboxPath, 
session))
             .flux()
-            .onErrorResume(MailboxNotFoundException.class, e -> 
searchMessageManagerByMailboxRole(aRole, username));
+            .onErrorResume(MailboxNotFoundException.class, e -> 
searchMessageManagerByMailboxRole(aRole, username))
+            .doFinally(any -> mailboxManager.endProcessingRequest(session));
     }
 
     private boolean hasRole(Role aRole, MailboxPath mailBoxPath) {
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/MailboxAnnotationListener.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/MailboxAnnotationListener.java
index 6385f9caf2..8d11cb6ec9 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/MailboxAnnotationListener.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/MailboxAnnotationListener.java
@@ -69,7 +69,8 @@ public class MailboxAnnotationListener implements 
EventListener.ReactiveGroupEve
 
             return 
Flux.from(annotationMapper.getAllAnnotationsReactive(mailboxId))
                 .flatMap(annotation -> 
Mono.from(annotationMapper.deleteAnnotationReactive(mailboxId, 
annotation.getKey())))
-                .then();
+                .then()
+                .doFinally(any -> 
mailboxSessionMapperFactory.endProcessingRequest(mailboxSession));
         }
         return Mono.empty();
     }
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java
index 00506ca094..db58bf198f 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java
@@ -121,7 +121,8 @@ public class DefaultUserQuotaRootResolver implements 
UserQuotaRootResolver {
         return factory.getMailboxMapper(session)
             .findMailboxById(mailboxId)
             .map(Mailbox::generateAssociatedPath)
-            .flatMap(path -> Mono.from(getQuotaRootReactive(path)));
+            .flatMap(path -> Mono.from(getQuotaRootReactive(path)))
+            .doFinally(any -> factory.endProcessingRequest(session));
     }
 
     @Override
@@ -138,7 +139,8 @@ public class DefaultUserQuotaRootResolver implements 
UserQuotaRootResolver {
             .flatMap(this::getQuotaRootReactive, 
ReactorUtils.DEFAULT_CONCURRENCY)
             .distinct();
 
-        return Flux.concat(quotaRootListFromDelegatedMailboxes, 
Flux.just(forUser(username)));
+        return Flux.concat(quotaRootListFromDelegatedMailboxes, 
Flux.just(forUser(username)))
+            .doFinally(any -> factory.endProcessingRequest(session));
     }
 
     @Override
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java
index 3eb17a3661..5fb5019d56 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java
@@ -95,9 +95,9 @@ public abstract class ListeningMessageSearchIndex implements 
MessageSearchIndex,
      */
     @Override
     public Mono<Void> reactiveEvent(Event event) {
-        return handleMailboxEvent(event,
-            sessionProvider.createSystemSession(event.getUsername()),
-            (MailboxEvent) event);
+        MailboxSession systemSession = 
sessionProvider.createSystemSession(event.getUsername());
+        return handleMailboxEvent(event, systemSession, (MailboxEvent) event)
+            .then(Mono.fromRunnable(() -> 
factory.endProcessingRequest(systemSession)));
     }
 
     private Mono<Void> handleMailboxEvent(Event event, MailboxSession session, 
MailboxEvent mailboxEvent) {
diff --git 
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/MailboxAnnotationListenerTest.java
 
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/MailboxAnnotationListenerTest.java
index 1c310a5447..19739fdac7 100644
--- 
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/MailboxAnnotationListenerTest.java
+++ 
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/MailboxAnnotationListenerTest.java
@@ -125,6 +125,7 @@ class MailboxAnnotationListenerTest {
         listener.event(deleteEvent);
 
         
verify(mailboxSessionMapperFactory).getAnnotationMapper(eq(mailboxSession));
+        
verify(mailboxSessionMapperFactory).endProcessingRequest(eq(mailboxSession));
         verify(annotationMapper).getAllAnnotationsReactive(eq(mailboxId));
 
         verifyNoMoreInteractions(mailboxSessionMapperFactory);
@@ -140,6 +141,7 @@ class MailboxAnnotationListenerTest {
         listener.event(deleteEvent);
 
         
verify(mailboxSessionMapperFactory).getAnnotationMapper(eq(mailboxSession));
+        
verify(mailboxSessionMapperFactory).endProcessingRequest(eq(mailboxSession));
         verify(annotationMapper).getAllAnnotationsReactive(eq(mailboxId));
         verify(annotationMapper).deleteAnnotationReactive(eq(mailboxId), 
eq(PRIVATE_KEY));
         verify(annotationMapper).deleteAnnotationReactive(eq(mailboxId), 
eq(SHARED_KEY));
@@ -158,6 +160,7 @@ class MailboxAnnotationListenerTest {
         assertThatThrownBy(() -> 
listener.event(deleteEvent)).isInstanceOf(RuntimeException.class);
 
         
verify(mailboxSessionMapperFactory).getAnnotationMapper(eq(mailboxSession));
+        
verify(mailboxSessionMapperFactory).endProcessingRequest(eq(mailboxSession));
         verify(annotationMapper).getAllAnnotationsReactive(eq(mailboxId));
         verify(annotationMapper).deleteAnnotationReactive(eq(mailboxId), 
eq(PRIVATE_KEY));
 
diff --git 
a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java
 
b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java
index f557602cd1..d4adbfa34f 100644
--- 
a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java
+++ 
b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java
@@ -106,5 +106,6 @@ public class ReIndexerImpl implements ReIndexer {
     private void validateIdExists(MailboxId mailboxId) throws MailboxException 
{
         MailboxSession mailboxSession = 
mailboxManager.createSystemSession(Username.of("ReIndexingImap"));
         
MailboxReactorUtils.block(mapperFactory.getMailboxMapper(mailboxSession).findMailboxById(mailboxId));
+        mailboxManager.endProcessingRequest(mailboxSession);
     }
 }
diff --git 
a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
 
b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
index eafd50fe2a..9a1d779bc1 100644
--- 
a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
+++ 
b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
@@ -155,7 +155,8 @@ public class ReIndexerPerformer {
             .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, 
mailboxSession, runningOptions), MAILBOX_CONCURRENCY);
 
         return reIndexMessages(entriesToIndex, runningOptions, 
reIndexingContext)
-            .doFinally(any -> LOGGER.info("Full reindex finished"));
+            .doFinally(any -> LOGGER.info("Full reindex finished"))
+            .doFinally(any -> 
mailboxManager.endProcessingRequest(mailboxSession));
     }
 
     Mono<Result> reIndexSingleMailbox(MailboxId mailboxId, ReIndexingContext 
reIndexingContext, RunningOptions runningOptions) {
@@ -165,7 +166,8 @@ public class ReIndexerPerformer {
             .findMailboxById(mailboxId)
             .flatMapMany(mailbox -> reIndexingEntriesForMailbox(mailbox, 
mailboxSession, runningOptions));
 
-        return reIndexMessages(entriesToIndex, runningOptions, 
reIndexingContext);
+        return reIndexMessages(entriesToIndex, runningOptions, 
reIndexingContext)
+            .doFinally(any -> 
mailboxManager.endProcessingRequest(mailboxSession));
     }
 
     Mono<Result> reIndexUserMailboxes(Username username, ReIndexingContext 
reIndexingContext, RunningOptions runningOptions) {
@@ -180,7 +182,8 @@ public class ReIndexerPerformer {
                 .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, 
mailboxSession, runningOptions), MAILBOX_CONCURRENCY);
 
             return reIndexMessages(entriesToIndex, runningOptions, 
reIndexingContext)
-                .doFinally(any -> LOGGER.info("User {} reindex finished", 
username.asString()));
+                .doFinally(any -> LOGGER.info("User {} reindex finished", 
username.asString()))
+                .doFinally(any -> 
mailboxManager.endProcessingRequest(mailboxSession));
         } catch (Exception e) {
             LOGGER.error("Error fetching mailboxes for user: {}", 
username.asString());
             return Mono.just(Result.PARTIAL);
@@ -195,7 +198,8 @@ public class ReIndexerPerformer {
             .map(mailbox -> new ReIndexingEntry(mailbox, mailboxSession, uid))
             .flatMap(this::fullyReadMessage)
             .flatMap(message -> reIndex(message, mailboxSession))
-            .switchIfEmpty(Mono.just(Result.COMPLETED));
+            .switchIfEmpty(Mono.just(Result.COMPLETED))
+            .doFinally(any -> 
mailboxManager.endProcessingRequest(mailboxSession));
     }
 
     Mono<Result> reIndexMessageId(MessageId messageId) {
@@ -209,7 +213,8 @@ public class ReIndexerPerformer {
             .onErrorResume(e -> {
                 LOGGER.warn("Failed to re-index {}", messageId, e);
                 return Mono.just(Result.PARTIAL);
-            });
+            })
+            .doFinally(any -> mailboxManager.endProcessingRequest(session));
     }
 
     Mono<Result> reIndexErrors(ReIndexingContext reIndexingContext, 
ReIndexingExecutionFailures previousReIndexingFailures, RunningOptions 
runningOptions) {
@@ -227,7 +232,8 @@ public class ReIndexerPerformer {
                         return Mono.just(Either.left(new 
MailboxFailure(mailboxId)));
                     }), MAILBOX_CONCURRENCY));
 
-        return reIndexMessages(entriesToIndex, runningOptions, 
reIndexingContext);
+        return reIndexMessages(entriesToIndex, runningOptions, 
reIndexingContext)
+            .doFinally(any -> 
mailboxManager.endProcessingRequest(mailboxSession));
     }
 
     private Mono<Result> reIndex(MailboxMessage mailboxMessage, MailboxSession 
session) {
diff --git 
a/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java
 
b/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java
index acd288dfe7..f2d1a9c431 100644
--- 
a/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java
+++ 
b/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import javax.inject.Inject;
 
 import org.apache.james.core.Username;
+import org.apache.james.mailbox.MailboxManager;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.SessionProvider;
 import org.apache.james.mailbox.model.QuotaOperation;
@@ -147,18 +148,21 @@ public class RecomputeCurrentQuotasService {
     private final CurrentQuotaCalculator currentQuotaCalculator;
     private final UserQuotaRootResolver userQuotaRootResolver;
     private final SessionProvider sessionProvider;
+    private final MailboxManager mailboxManager;
 
     @Inject
     public RecomputeCurrentQuotasService(UsersRepository usersRepository,
                                          CurrentQuotaManager 
storeCurrentQuotaManager,
                                          CurrentQuotaCalculator 
currentQuotaCalculator,
                                          UserQuotaRootResolver 
userQuotaRootResolver,
-                                         SessionProvider sessionProvider) {
+                                         SessionProvider sessionProvider,
+                                         MailboxManager mailboxManager) {
         this.usersRepository = usersRepository;
         this.storeCurrentQuotaManager = storeCurrentQuotaManager;
         this.currentQuotaCalculator = currentQuotaCalculator;
         this.userQuotaRootResolver = userQuotaRootResolver;
         this.sessionProvider = sessionProvider;
+        this.mailboxManager = mailboxManager;
     }
 
     public Mono<Task.Result> recomputeCurrentQuotas(Context context, 
RunningOptions runningOptions) {
@@ -190,6 +194,7 @@ public class RecomputeCurrentQuotasService {
                 LOGGER.error("Error while recomputing current quotas for {}", 
quotaRoot, e);
                 context.addToFailedMailboxes(quotaRoot);
                 return Mono.just(Task.Result.PARTIAL);
-            });
+            })
+            .doFinally(any -> mailboxManager.endProcessingRequest(session));
     }
 }
diff --git 
a/server/container/feature-checks/src/main/java/org/apache/james/healthcheck/MailReceptionCheck.java
 
b/server/container/feature-checks/src/main/java/org/apache/james/healthcheck/MailReceptionCheck.java
index c8f31d0844..8d865ee097 100644
--- 
a/server/container/feature-checks/src/main/java/org/apache/james/healthcheck/MailReceptionCheck.java
+++ 
b/server/container/feature-checks/src/main/java/org/apache/james/healthcheck/MailReceptionCheck.java
@@ -223,7 +223,8 @@ public class MailReceptionCheck implements HealthCheck {
             .onErrorResume(e -> {
                 LOGGER.error("Mail reception check failed", e);
                 return Mono.just(Result.unhealthy(componentName(), 
e.getMessage()));
-            });
+            })
+            .doFinally(any -> mailboxManager.endProcessingRequest(session));
     }
 
     private Mono<MessageManager> retrieveInbox(Username username, 
MailboxSession session) {
diff --git 
a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/ACLUsernameChangeTaskStep.java
 
b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/ACLUsernameChangeTaskStep.java
index 813dc8a043..7027645eb0 100644
--- 
a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/ACLUsernameChangeTaskStep.java
+++ 
b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/ACLUsernameChangeTaskStep.java
@@ -67,7 +67,9 @@ public class ACLUsernameChangeTaskStep implements 
UsernameChangeTaskStep {
         return 
mailboxManager.search(MailboxQuery.builder().matchesAllMailboxNames().build(), 
oldSession)
             .filter(mailbox -> 
!mailbox.getPath().getUser().equals(oldUsername))
             .concatMap(mailbox -> migrateACLs(oldUsername, newUsername, 
mailbox))
-            .then(updateSubscriptionsOnDeletedMailboxes(oldUsername, 
oldSession, newSession));
+            .then(updateSubscriptionsOnDeletedMailboxes(oldUsername, 
oldSession, newSession))
+            .doFinally(any -> mailboxManager.endProcessingRequest(oldSession))
+            .doFinally(any -> mailboxManager.endProcessingRequest(newSession));
     }
 
     private Mono<Void> updateSubscriptionsOnDeletedMailboxes(Username 
oldUsername, MailboxSession oldSession, MailboxSession newSession) {
@@ -90,6 +92,7 @@ public class ACLUsernameChangeTaskStep implements 
UsernameChangeTaskStep {
         return Mono.fromRunnable(Throwing.runnable(() -> 
mailboxManager.applyRightsCommand(mailbox.getId(), 
MailboxACL.command().rights(rights).forUser(newUsername).asAddition(), 
ownerSession)))
             .then(Mono.fromRunnable(Throwing.runnable(() -> 
mailboxManager.applyRightsCommand(mailbox.getId(), 
MailboxACL.command().rights(rights).forUser(oldUsername).asRemoval(), 
ownerSession))))
             .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
-            .then();
+            .then()
+            .doFinally(any -> 
mailboxManager.endProcessingRequest(ownerSession));
     }
 }
diff --git 
a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUserDeletionTaskStep.java
 
b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUserDeletionTaskStep.java
index 035686decf..d916fbf9b4 100644
--- 
a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUserDeletionTaskStep.java
+++ 
b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUserDeletionTaskStep.java
@@ -71,7 +71,8 @@ public class MailboxUserDeletionTaskStep implements 
DeleteUserDataTaskStep {
             .then(getSharedMailboxesOfUser(mailboxSession)
                 .flatMap(sharedMailbox -> revokeACLs(username, sharedMailbox)
                     .then(deleteSubscription(mailboxSession, sharedMailbox)))
-                .then());
+                .then())
+            .doFinally(any -> 
mailboxManager.endProcessingRequest(mailboxSession));
     }
 
     private Flux<MailboxMetaData> getAllMailboxesOfUser(MailboxSession 
mailboxSession) {
@@ -103,7 +104,8 @@ public class MailboxUserDeletionTaskStep implements 
DeleteUserDataTaskStep {
 
         return Mono.fromRunnable(Throwing.runnable(() -> 
mailboxManager.applyRightsCommand(mailbox.getId(), 
MailboxACL.command().rights(rights).forUser(username).asRemoval(), 
ownerSession)))
             .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
-            .then();
+            .then()
+            .doFinally(any -> 
mailboxManager.endProcessingRequest(ownerSession));
     }
 
 }
diff --git 
a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java
 
b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java
index cdf02d268a..a28d9544ad 100644
--- 
a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java
+++ 
b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java
@@ -72,7 +72,9 @@ public class MailboxUsernameChangeTaskStep implements 
UsernameChangeTaskStep {
         return mailboxManager.search(queryUser, 
MailboxManager.MailboxSearchFetchType.Minimal, fromSession)
             // Only keep top level, rename takes care of sub mailboxes
             .filter(mailbox -> 
mailbox.getPath().getHierarchyLevels(fromSession.getPathDelimiter()).size() == 
1)
-            .concatMap(mailbox -> migrateMailbox(fromSession, toSession, 
mailbox));
+            .concatMap(mailbox -> migrateMailbox(fromSession, toSession, 
mailbox))
+            .doFinally(any -> mailboxManager.endProcessingRequest(fromSession))
+            .doFinally(any -> mailboxManager.endProcessingRequest(toSession));
     }
 
     private Mono<Void> migrateMailbox(MailboxSession fromSession, 
MailboxSession toSession, org.apache.james.mailbox.model.MailboxMetaData 
mailbox) {
@@ -115,16 +117,20 @@ public class MailboxUsernameChangeTaskStep implements 
UsernameChangeTaskStep {
         return 
Flux.fromIterable(mailbox.getResolvedAcls().getEntries().entrySet())
             .filter(entry -> entry.getKey().getNameType() == 
MailboxACL.NameType.user && !entry.getKey().isNegative())
             .map(entry -> Username.of(entry.getKey().getName()))
-            .concatMap(Throwing.function(userWithAccess ->
-                
Flux.from(subscriptionManager.subscriptionsReactive(mailboxManager.createSystemSession(userWithAccess)))
+            .concatMap(Throwing.function(userWithAccess -> {
+                MailboxSession session = 
mailboxManager.createSystemSession(userWithAccess);
+                return 
Flux.from(subscriptionManager.subscriptionsReactive(session))
                     .filter(subscribedMailbox -> 
subscribedMailbox.equals(mailbox.getPath()))
-                    .concatMap(any -> renameSubscription(mailbox, renamedPath, 
userWithAccess))))
+                    .concatMap(any -> renameSubscription(mailbox, renamedPath, 
userWithAccess))
+                    .doFinally(any -> 
mailboxManager.endProcessingRequest(session));
+            }))
             .then();
     }
 
     private Mono<Void> renameSubscription(MailboxMetaData mailbox, MailboxPath 
renamedPath, Username user) {
         MailboxSession session = mailboxManager.createSystemSession(user);
         return Mono.from(subscriptionManager.subscribeReactive(renamedPath, 
session))
-            
.then(Mono.from(subscriptionManager.unsubscribeReactive(mailbox.getPath(), 
session)));
+            
.then(Mono.from(subscriptionManager.unsubscribeReactive(mailbox.getPath(), 
session)))
+            .doFinally(any -> mailboxManager.endProcessingRequest(session));
     }
 }
diff --git 
a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RandomStoring.java
 
b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RandomStoring.java
index 6b817023d3..c1eb916848 100644
--- 
a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RandomStoring.java
+++ 
b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RandomStoring.java
@@ -122,7 +122,8 @@ public class RandomStoring extends GenericMailet {
             MailboxSession session = 
mailboxManager.createSystemSession(username);
             return mailboxManager
                 .search(MailboxQuery.privateMailboxesBuilder(session).build(), 
Minimal, session)
-                .map(metaData -> new 
ReroutingInfos(metaData.getPath().getName(), username));
+                .map(metaData -> new 
ReroutingInfos(metaData.getPath().getName(), username))
+                .doFinally(any -> 
mailboxManager.endProcessingRequest(session));
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
diff --git 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/PostDequeueDecorator.java
 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/PostDequeueDecorator.java
index 44daed400f..be51824591 100644
--- 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/PostDequeueDecorator.java
+++ 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/PostDequeueDecorator.java
@@ -88,6 +88,7 @@ public class PostDequeueDecorator extends 
MailQueueItemDecorator {
                     MailboxSession mailboxSession = 
mailboxManager.createSystemSession(Username.of(username.get()));
                     moveFromOutboxToSentWithSeenFlag(messageId, 
mailboxSession);
                     getMail().setAttribute(IS_DELIVERED);
+                    mailboxManager.endProcessingRequest(mailboxSession);
                 } catch (MailShouldBeInOutboxException e) {
                     LOG.info("Message does not exist on Outbox anymore, it 
could have already been sent", e);
                 }
diff --git 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/filter/ActionApplier.java
 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/filter/ActionApplier.java
index 5dcbc2b432..d9845cc581 100644
--- 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/filter/ActionApplier.java
+++ 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/filter/ActionApplier.java
@@ -124,6 +124,7 @@ public class ActionApplier {
             MailboxId mailboxId = mailboxIdFactory.fromString(mailboxIdString);
             MailboxSession mailboxSession = 
mailboxManager.createSystemSession(username);
             MessageManager messageManager = 
mailboxManager.getMailbox(mailboxId, mailboxSession);
+            mailboxManager.endProcessingRequest(mailboxSession);
 
             return Stream.of(messageManager.getMailboxPath().getName());
         } catch (MailboxNotFoundException e) {
diff --git 
a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/RestoreService.java
 
b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/RestoreService.java
index 554e273434..bca7bcfeea 100644
--- 
a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/RestoreService.java
+++ 
b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/RestoreService.java
@@ -79,7 +79,8 @@ public class RestoreService {
         MessageManager restoreMessageManager = restoreMailboxManager(session);
 
         return Flux.from(deletedMessageVault.search(usernameToRestore, 
searchQuery))
-            .flatMap(deletedMessage -> appendToMailbox(restoreMessageManager, 
deletedMessage, session), DEFAULT_CONCURRENCY);
+            .flatMap(deletedMessage -> appendToMailbox(restoreMessageManager, 
deletedMessage, session), DEFAULT_CONCURRENCY)
+            .doFinally(any -> mailboxManager.endProcessingRequest(session));
     }
 
     private Mono<RestoreResult> appendToMailbox(MessageManager 
restoreMailboxManager, DeletedMessage deletedMessage, MailboxSession session) {
diff --git 
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/CreateMissingParentsTask.java
 
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/CreateMissingParentsTask.java
index d8da1c6eba..c92b172c24 100644
--- 
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/CreateMissingParentsTask.java
+++ 
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/CreateMissingParentsTask.java
@@ -153,6 +153,8 @@ public class CreateMissingParentsTask implements Task {
         } catch (MailboxException e) {
             LOGGER.error("Error fetching mailbox paths", e);
             return Result.PARTIAL;
+        } finally {
+            mailboxManager.endProcessingRequest(session);
         }
     }
 
@@ -160,12 +162,13 @@ public class CreateMissingParentsTask implements Task {
         MailboxSession ownerSession = 
mailboxManager.createSystemSession(path.getUser());
         return Mono.from(mailboxManager.createMailboxReactive(path, 
ownerSession))
             .doOnNext(this::recordSuccess)
-        .then(Mono.just(Result.COMPLETED))
-        .onErrorResume(e -> {
-            LOGGER.error("Error creating missing parent mailbox: {}", 
path.getName(), e);
-            recordFailure(path);
-            return Mono.just(Result.PARTIAL);
-        });
+            .then(Mono.just(Result.COMPLETED))
+            .onErrorResume(e -> {
+                LOGGER.error("Error creating missing parent mailbox: {}", 
path.getName(), e);
+                recordFailure(path);
+                return Mono.just(Result.PARTIAL);
+            })
+            .doFinally(any -> 
mailboxManager.endProcessingRequest(ownerSession));
     }
 
     @Override
diff --git 
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxService.java
 
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxService.java
index fc709d4bba..526cbdae45 100644
--- 
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxService.java
+++ 
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxService.java
@@ -217,7 +217,8 @@ public class ExpireMailboxService {
                 context.incrementFailedCount();
                 context.incrementProcessedCount();
                 return Mono.just(Task.Result.PARTIAL);
-            });
+            })
+            .doFinally(any -> mailboxManager.endProcessingRequest(session));
     }
 
     private Mono<List<MessageUid>> searchMessagesReactive(MessageManager mgr, 
MailboxSession session, SearchQuery expiration) {
diff --git 
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/UserMailboxesService.java
 
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/UserMailboxesService.java
index a7bf0d7291..e57048f5ac 100644
--- 
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/UserMailboxesService.java
+++ 
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/UserMailboxesService.java
@@ -74,6 +74,7 @@ public class UserMailboxesService {
             MailboxPath mailboxPath = MailboxPath.forUser(username, 
mailboxName.asString())
                 .assertAcceptable(mailboxSession.getPathDelimiter());
             mailboxManager.createMailbox(mailboxPath, mailboxSession);
+            mailboxManager.endProcessingRequest(mailboxSession);
         } catch (MailboxExistsException e) {
             LOGGER.info("Attempt to create mailbox {} for user {} that already 
exists", mailboxName, username);
         }
@@ -85,14 +86,19 @@ public class UserMailboxesService {
         listUserMailboxes(mailboxSession)
             .map(MailboxMetaData::getPath)
             .forEach(Throwing.consumer(mailboxPath -> 
deleteMailbox(mailboxSession, mailboxPath)));
+        mailboxManager.endProcessingRequest(mailboxSession);
     }
 
     public List<MailboxResponse> listMailboxes(Username username) throws 
UsersRepositoryException {
         usernamePreconditions(username);
         MailboxSession mailboxSession = 
mailboxManager.createSystemSession(username);
-        return listUserMailboxes(mailboxSession)
-            .map(mailboxMetaData -> new 
MailboxResponse(mailboxMetaData.getPath().getName(), mailboxMetaData.getId()))
-            .collect(ImmutableList.toImmutableList());
+        try {
+            return listUserMailboxes(mailboxSession)
+                .map(mailboxMetaData -> new 
MailboxResponse(mailboxMetaData.getPath().getName(), mailboxMetaData.getId()))
+                .collect(ImmutableList.toImmutableList());
+        } finally {
+            mailboxManager.endProcessingRequest(mailboxSession);
+        }
     }
 
     public boolean testMailboxExists(Username username, MailboxName 
mailboxName) throws MailboxException, UsersRepositoryException {
@@ -100,8 +106,12 @@ public class UserMailboxesService {
         MailboxSession mailboxSession = 
mailboxManager.createSystemSession(username);
         MailboxPath mailboxPath = MailboxPath.forUser(username, 
mailboxName.asString())
             .assertAcceptable(mailboxSession.getPathDelimiter());
-        return Mono.from(mailboxManager.mailboxExists(mailboxPath, 
mailboxSession))
-            .block();
+        try {
+            return Mono.from(mailboxManager.mailboxExists(mailboxPath, 
mailboxSession))
+                .block();
+        } finally {
+            mailboxManager.endProcessingRequest(mailboxSession);
+        }
     }
 
 
@@ -118,7 +128,8 @@ public class UserMailboxesService {
                 return Mono.just(Result.PARTIAL);
             })
             .reduce(Task::combine)
-            .switchIfEmpty(Mono.just(Result.COMPLETED));
+            .switchIfEmpty(Mono.just(Result.COMPLETED))
+            .doFinally(any -> 
mailboxManager.endProcessingRequest(mailboxSession));
     }
 
     private Mono<Result> deleteMessage(MessageManager messageManager, 
MessageUid messageUid, MailboxSession mailboxSession, 
ClearMailboxContentTask.Context context) {
@@ -138,20 +149,29 @@ public class UserMailboxesService {
             .assertAcceptable(mailboxSession.getPathDelimiter());
         listChildren(mailboxPath, mailboxSession)
             .forEach(Throwing.consumer(path -> deleteMailbox(mailboxSession, 
path)));
+        mailboxManager.endProcessingRequest(mailboxSession);
     }
 
     public long messageCount(Username username, MailboxName mailboxName) 
throws UsersRepositoryException, MailboxException {
         usernamePreconditions(username);
         MailboxSession mailboxSession = 
mailboxManager.createSystemSession(username);
-        return mailboxManager.getMailbox(MailboxPath.forUser(username, 
mailboxName.asString()), mailboxSession).getMessageCount(mailboxSession);
+        try {
+            return mailboxManager.getMailbox(MailboxPath.forUser(username, 
mailboxName.asString()), mailboxSession).getMessageCount(mailboxSession);
+        } finally {
+            mailboxManager.endProcessingRequest(mailboxSession);
+        }
     }
 
     public long unseenMessageCount(Username username, MailboxName mailboxName) 
throws UsersRepositoryException, MailboxException {
         usernamePreconditions(username);
         MailboxSession mailboxSession = 
mailboxManager.createSystemSession(username);
-        return mailboxManager.getMailbox(MailboxPath.forUser(username, 
mailboxName.asString()), mailboxSession)
-            .getMailboxCounters(mailboxSession)
-            .getUnseen();
+        try {
+            return mailboxManager.getMailbox(MailboxPath.forUser(username, 
mailboxName.asString()), mailboxSession)
+                .getMailboxCounters(mailboxSession)
+                .getUnseen();
+        } finally {
+            mailboxManager.endProcessingRequest(mailboxSession);
+        }
     }
 
     private Stream<MailboxPath> listChildren(MailboxPath mailboxPath, 
MailboxSession mailboxSession) {
@@ -178,6 +198,7 @@ public class UserMailboxesService {
             .assertAcceptable(mailboxSession.getPathDelimiter());
         
Preconditions.checkState(Boolean.TRUE.equals(Mono.from(mailboxManager.mailboxExists(mailboxPath,
 mailboxSession)).block()),
             "Mailbox does not exist. " + mailboxPath.asString());
+        mailboxManager.endProcessingRequest(mailboxSession);
     }
 
     private Stream<MailboxMetaData> listUserMailboxes(MailboxSession 
mailboxSession) {
diff --git 
a/third-party/rspamd/src/main/java/org/apache/james/rspamd/RspamdListener.java 
b/third-party/rspamd/src/main/java/org/apache/james/rspamd/RspamdListener.java
index 6af803e4c2..6d9b1c5613 100644
--- 
a/third-party/rspamd/src/main/java/org/apache/james/rspamd/RspamdListener.java
+++ 
b/third-party/rspamd/src/main/java/org/apache/james/rspamd/RspamdListener.java
@@ -101,10 +101,12 @@ public class RspamdListener implements SpamEventListener, 
EventListener.Reactive
     }
 
     private Mono<Void> handleMessageAdded(MailboxEvents.Added addedEvent) {
+        MailboxSession mailboxSession = 
mailboxManager.createSystemSession(Username.of(getClass().getCanonicalName()));
         return isAppendedToInbox(addedEvent)
             .filter(FunctionalUtils.identityPredicate())
             .doOnNext(isHam -> LOGGER.debug("Ham event detected, EventId = 
{}", addedEvent.getEventId().getId()))
-            .flatMap(any -> reportHamWhenAdded(addedEvent, 
mailboxManager.createSystemSession(Username.of(getClass().getCanonicalName()))));
+            .flatMap(any -> reportHamWhenAdded(addedEvent, mailboxSession))
+            .then(Mono.fromRunnable(() -> 
mailboxManager.endProcessingRequest(mailboxSession)));
     }
 
     private Mono<Void> handleMessageMoved(MessageMoveEvent messageMoveEvent) {
@@ -123,9 +125,11 @@ public class RspamdListener implements SpamEventListener, 
EventListener.Reactive
     }
 
     private Flux<ByteBuffer> mailboxMessagePublisher(MessageMoveEvent 
messageMoveEvent) {
-        return Mono.fromCallable(() -> 
mapperFactory.getMessageIdMapper(mailboxManager.createSystemSession(Username.of(getClass().getCanonicalName()))))
+        MailboxSession mailboxSession = 
mailboxManager.createSystemSession(Username.of(getClass().getCanonicalName()));
+        return Mono.fromCallable(() -> 
mapperFactory.getMessageIdMapper(mailboxSession))
             .flatMapMany(messageIdMapper -> 
messageIdMapper.findReactive(messageMoveEvent.getMessageIds(), 
MessageMapper.FetchType.FULL))
-            .flatMap(MailboxMessage::getFullContentReactive);
+            .flatMap(MailboxMessage::getFullContentReactive)
+            .doFinally(any -> 
mailboxManager.endProcessingRequest(mailboxSession));
     }
 
     private Mono<Void> handleMessageMoved(Flux<ByteBuffer> 
mailboxMessagesPublisher, MessageMoveEvent messageMoveEvent) {
diff --git 
a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java
 
b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java
index 55bbb1b089..447fd17687 100644
--- 
a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java
+++ 
b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java
@@ -102,7 +102,8 @@ public class GetMailboxMessagesService {
             .doOnNext(mailboxMessageMetaData -> 
context.incrementSpamMessageCount())
             .filter(message -> randomBooleanWithProbability(runningOptions))
             .flatMap(message -> 
messageIdManager.getMessagesReactive(List.of(message.getMessageId()), 
FetchGroup.FULL_CONTENT, mailboxSession), ReactorUtils.DEFAULT_CONCURRENCY)
-            .filter(runningOptions.correspondingClassificationFilter());
+            .filter(runningOptions.correspondingClassificationFilter())
+            .doFinally(any -> 
mailboxManager.endProcessingRequest(mailboxSession));
     }
 
     private Flux<MessageResult> getMailboxMessagesOfAUser(Username username, 
MailboxMetaData mailboxMetaData, Optional<Date> afterDate,
@@ -119,7 +120,8 @@ public class GetMailboxMessagesService {
             .doOnNext(mailboxMessageMetaData -> 
context.incrementHamMessageCount())
             .filter(message -> randomBooleanWithProbability(runningOptions))
             .flatMap(message -> 
messageIdManager.getMessagesReactive(List.of(message.getMessageId()), 
FetchGroup.FULL_CONTENT, mailboxSession), ReactorUtils.DEFAULT_CONCURRENCY)
-            .filter(runningOptions.correspondingClassificationFilter());
+            .filter(runningOptions.correspondingClassificationFilter())
+            .doFinally(any -> 
mailboxManager.endProcessingRequest(mailboxSession));
     }
 
     public static boolean randomBooleanWithProbability(RunningOptions 
runningOptions) {
diff --git 
a/third-party/spamassassin/src/main/java/org/apache/james/spamassassin/SpamAssassinListener.java
 
b/third-party/spamassassin/src/main/java/org/apache/james/spamassassin/SpamAssassinListener.java
index 19a86262df..cace69aef6 100644
--- 
a/third-party/spamassassin/src/main/java/org/apache/james/spamassassin/SpamAssassinListener.java
+++ 
b/third-party/spamassassin/src/main/java/org/apache/james/spamassassin/SpamAssassinListener.java
@@ -97,10 +97,12 @@ public class SpamAssassinListener implements 
SpamEventListener {
         if (event instanceof MessageMoveEvent) {
             MailboxSession session = 
mailboxManager.createSystemSession(username);
             handleMessageMove(event, session, (MessageMoveEvent) event);
+            mailboxManager.endProcessingRequest(session);
         }
         if (event instanceof Added) {
             MailboxSession session = 
mailboxManager.createSystemSession(username);
             handleAdded(event, session, (Added) event);
+            mailboxManager.endProcessingRequest(session);
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to