This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 2c53f0130cca2d5fd3aa5fde08f61d5a261abb81 Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Fri Jun 2 17:07:37 2023 +0700 [FIX] JPA: avoid leaks in entity manager --- .../listeners/SetCustomFlagOnBigMessages.java | 1 + .../james/mailbox/backup/DefaultMailboxBackup.java | 16 +++++---- .../mailbox/backup/ZipMailArchiveRestorer.java | 1 + .../CassandraMailboxSessionMapperFactory.java | 5 +++ ...CassandraRecomputeCurrentQuotasServiceTest.java | 2 +- .../task/JPARecomputeCurrentQuotasServiceTest.java | 2 +- .../MemoryRecomputeCurrentQuotasServiceTest.java | 4 ++- .../mailbox/store/SystemMailboxesProviderImpl.java | 3 +- .../store/event/MailboxAnnotationListener.java | 3 +- .../store/quota/DefaultUserQuotaRootResolver.java | 6 ++-- .../store/search/ListeningMessageSearchIndex.java | 6 ++-- .../store/event/MailboxAnnotationListenerTest.java | 3 ++ .../mailbox/tools/indexer/ReIndexerImpl.java | 1 + .../mailbox/tools/indexer/ReIndexerPerformer.java | 18 ++++++---- .../quota/task/RecomputeCurrentQuotasService.java | 9 +++-- .../james/healthcheck/MailReceptionCheck.java | 3 +- .../adapter/mailbox/ACLUsernameChangeTaskStep.java | 7 ++-- .../mailbox/MailboxUserDeletionTaskStep.java | 6 ++-- .../mailbox/MailboxUsernameChangeTaskStep.java | 16 ++++++--- .../james/transport/mailets/RandomStoring.java | 3 +- .../jmap/draft/send/PostDequeueDecorator.java | 1 + .../james/jmap/mailet/filter/ActionApplier.java | 1 + .../webadmin/vault/routes/RestoreService.java | 3 +- .../webadmin/service/CreateMissingParentsTask.java | 15 ++++---- .../webadmin/service/ExpireMailboxService.java | 3 +- .../webadmin/service/UserMailboxesService.java | 41 ++++++++++++++++------ .../org/apache/james/rspamd/RspamdListener.java | 10 ++++-- .../rspamd/task/GetMailboxMessagesService.java | 6 ++-- .../james/spamassassin/SpamAssassinListener.java | 2 ++ 29 files changed, 139 insertions(+), 58 deletions(-) diff --git a/examples/custom-listeners/src/main/java/org/apache/james/examples/custom/listeners/SetCustomFlagOnBigMessages.java b/examples/custom-listeners/src/main/java/org/apache/james/examples/custom/listeners/SetCustomFlagOnBigMessages.java index d7223c1d6b..192e6df1a0 100644 --- a/examples/custom-listeners/src/main/java/org/apache/james/examples/custom/listeners/SetCustomFlagOnBigMessages.java +++ b/examples/custom-listeners/src/main/java/org/apache/james/examples/custom/listeners/SetCustomFlagOnBigMessages.java @@ -86,6 +86,7 @@ class SetCustomFlagOnBigMessages implements EventListener.GroupEventListener { FlagsUpdateMode.ADD, MessageRange.one(messageUid), session); + mailboxManager.endProcessingRequest(session); } catch (MailboxException e) { LOGGER.error("error happens when adding '{}' flag to the message with uid {} in mailbox {} of user {}", BIG_MESSAGE, messageUid.asLong(), addedEvent.getMailboxId(), addedEvent.getUsername().asString(), e); diff --git a/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/DefaultMailboxBackup.java b/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/DefaultMailboxBackup.java index 0c9fd32bcd..bc6697a88a 100644 --- a/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/DefaultMailboxBackup.java +++ b/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/DefaultMailboxBackup.java @@ -32,7 +32,6 @@ import org.apache.james.core.Username; import org.apache.james.mailbox.MailboxManager; import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.MessageManager; -import org.apache.james.mailbox.exception.BadCredentialsException; import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.model.FetchGroup; import org.apache.james.mailbox.model.Mailbox; @@ -98,14 +97,19 @@ public class DefaultMailboxBackup implements MailboxBackup { Stream<MessageResult> messages = allMessagesForUser(accountContents); archive(mailboxes, messages, destination); + mailboxManager.endProcessingRequest(session); } - private boolean isAccountNonEmpty(Username username) throws BadCredentialsException, MailboxException, IOException { + private boolean isAccountNonEmpty(Username username) throws MailboxException { MailboxSession session = mailboxManager.createSystemSession(username); - return getAccountContentForUser(session) - .stream() - .findFirst() - .isPresent(); + try { + return getAccountContentForUser(session) + .stream() + .findFirst() + .isPresent(); + } finally { + mailboxManager.endProcessingRequest(session); + } } @Override diff --git a/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/ZipMailArchiveRestorer.java b/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/ZipMailArchiveRestorer.java index 5830c69019..215f7b19bf 100644 --- a/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/ZipMailArchiveRestorer.java +++ b/mailbox/backup/src/main/java/org/apache/james/mailbox/backup/ZipMailArchiveRestorer.java @@ -58,6 +58,7 @@ public class ZipMailArchiveRestorer implements MailArchiveRestorer { public void restore(Username username, InputStream source) throws MailboxException, IOException { MailboxSession session = mailboxManager.createSystemSession(username); restoreEntries(source, session); + mailboxManager.endProcessingRequest(session); } private void restoreEntries(InputStream source, MailboxSession session) throws IOException { diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java index 84d86b1882..fb3069869b 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java @@ -231,6 +231,11 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa return cassandraSubscriptionMapper; } + @Override + public void endProcessingRequest(MailboxSession session) { + + } + public DeleteMessageListener deleteMessageListener() { return new DeleteMessageListener(threadDAO, threadLookupDAO, imapUidDAO, messageIdDAO, messageDAO, messageDAOV3, attachmentDAOV2, attachmentMessageIdDAO, aclMapper, userMailboxRightsDAO, applicableFlagDAO, firstUnseenDAO, deletedMessageDAO, diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/CassandraRecomputeCurrentQuotasServiceTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/CassandraRecomputeCurrentQuotasServiceTest.java index 113b741ed4..9b5d3b628a 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/CassandraRecomputeCurrentQuotasServiceTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/CassandraRecomputeCurrentQuotasServiceTest.java @@ -86,7 +86,7 @@ public class CassandraRecomputeCurrentQuotasServiceTest implements RecomputeCurr userQuotaRootResolver = new DefaultUserQuotaRootResolver(sessionProvider, mapperFactory); CurrentQuotaCalculator currentQuotaCalculator = new CurrentQuotaCalculator(mapperFactory, userQuotaRootResolver); - testee = new RecomputeCurrentQuotasService(usersRepository, currentQuotaManager, currentQuotaCalculator, userQuotaRootResolver, sessionProvider); + testee = new RecomputeCurrentQuotasService(usersRepository, currentQuotaManager, currentQuotaCalculator, userQuotaRootResolver, sessionProvider, mailboxManager); } @Override diff --git a/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/task/JPARecomputeCurrentQuotasServiceTest.java b/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/task/JPARecomputeCurrentQuotasServiceTest.java index 5d5dfbf696..43925070a6 100644 --- a/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/task/JPARecomputeCurrentQuotasServiceTest.java +++ b/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/task/JPARecomputeCurrentQuotasServiceTest.java @@ -87,7 +87,7 @@ class JPARecomputeCurrentQuotasServiceTest implements RecomputeCurrentQuotasServ CurrentQuotaCalculator currentQuotaCalculator = new CurrentQuotaCalculator(mapperFactory, userQuotaRootResolver); - testee = new RecomputeCurrentQuotasService(usersRepository, currentQuotaManager, currentQuotaCalculator, userQuotaRootResolver, sessionProvider); + testee = new RecomputeCurrentQuotasService(usersRepository, currentQuotaManager, currentQuotaCalculator, userQuotaRootResolver, sessionProvider, mailboxManager); } @AfterEach diff --git a/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/mail/task/MemoryRecomputeCurrentQuotasServiceTest.java b/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/mail/task/MemoryRecomputeCurrentQuotasServiceTest.java index 328fccb798..2fa06bde62 100644 --- a/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/mail/task/MemoryRecomputeCurrentQuotasServiceTest.java +++ b/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/mail/task/MemoryRecomputeCurrentQuotasServiceTest.java @@ -26,6 +26,7 @@ import org.apache.james.domainlist.lib.DomainListConfiguration; import org.apache.james.domainlist.memory.MemoryDomainList; import org.apache.james.mailbox.MailboxManager; import org.apache.james.mailbox.SessionProvider; +import org.apache.james.mailbox.inmemory.InMemoryMailboxManager; import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources; import org.apache.james.mailbox.quota.CurrentQuotaManager; import org.apache.james.mailbox.quota.UserQuotaRootResolver; @@ -52,7 +53,8 @@ class MemoryRecomputeCurrentQuotasServiceTest implements RecomputeCurrentQuotasS usersRepository = MemoryUsersRepository.withoutVirtualHosting(memoryDomainList); resources = InMemoryIntegrationResources.defaultResources(); - testee = new RecomputeCurrentQuotasService(usersRepository, resources.getCurrentQuotaManager(), resources.getCurrentQuotaCalculator(), resources.getDefaultUserQuotaRootResolver(), resources.getMailboxManager().getSessionProvider()); + InMemoryMailboxManager mailboxManager = resources.getMailboxManager(); + testee = new RecomputeCurrentQuotasService(usersRepository, resources.getCurrentQuotaManager(), resources.getCurrentQuotaCalculator(), resources.getDefaultUserQuotaRootResolver(), mailboxManager.getSessionProvider(), mailboxManager); } @Override diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/SystemMailboxesProviderImpl.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/SystemMailboxesProviderImpl.java index 3b6a2fbd7c..a1518902b1 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/SystemMailboxesProviderImpl.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/SystemMailboxesProviderImpl.java @@ -57,7 +57,8 @@ public class SystemMailboxesProviderImpl implements SystemMailboxesProvider { return Mono.from(mailboxManager.getMailboxReactive(mailboxPath, session)) .flux() - .onErrorResume(MailboxNotFoundException.class, e -> searchMessageManagerByMailboxRole(aRole, username)); + .onErrorResume(MailboxNotFoundException.class, e -> searchMessageManagerByMailboxRole(aRole, username)) + .doFinally(any -> mailboxManager.endProcessingRequest(session)); } private boolean hasRole(Role aRole, MailboxPath mailBoxPath) { diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/MailboxAnnotationListener.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/MailboxAnnotationListener.java index 6385f9caf2..8d11cb6ec9 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/MailboxAnnotationListener.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/MailboxAnnotationListener.java @@ -69,7 +69,8 @@ public class MailboxAnnotationListener implements EventListener.ReactiveGroupEve return Flux.from(annotationMapper.getAllAnnotationsReactive(mailboxId)) .flatMap(annotation -> Mono.from(annotationMapper.deleteAnnotationReactive(mailboxId, annotation.getKey()))) - .then(); + .then() + .doFinally(any -> mailboxSessionMapperFactory.endProcessingRequest(mailboxSession)); } return Mono.empty(); } diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java index 00506ca094..db58bf198f 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java @@ -121,7 +121,8 @@ public class DefaultUserQuotaRootResolver implements UserQuotaRootResolver { return factory.getMailboxMapper(session) .findMailboxById(mailboxId) .map(Mailbox::generateAssociatedPath) - .flatMap(path -> Mono.from(getQuotaRootReactive(path))); + .flatMap(path -> Mono.from(getQuotaRootReactive(path))) + .doFinally(any -> factory.endProcessingRequest(session)); } @Override @@ -138,7 +139,8 @@ public class DefaultUserQuotaRootResolver implements UserQuotaRootResolver { .flatMap(this::getQuotaRootReactive, ReactorUtils.DEFAULT_CONCURRENCY) .distinct(); - return Flux.concat(quotaRootListFromDelegatedMailboxes, Flux.just(forUser(username))); + return Flux.concat(quotaRootListFromDelegatedMailboxes, Flux.just(forUser(username))) + .doFinally(any -> factory.endProcessingRequest(session)); } @Override diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java index 3eb17a3661..5fb5019d56 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java @@ -95,9 +95,9 @@ public abstract class ListeningMessageSearchIndex implements MessageSearchIndex, */ @Override public Mono<Void> reactiveEvent(Event event) { - return handleMailboxEvent(event, - sessionProvider.createSystemSession(event.getUsername()), - (MailboxEvent) event); + MailboxSession systemSession = sessionProvider.createSystemSession(event.getUsername()); + return handleMailboxEvent(event, systemSession, (MailboxEvent) event) + .then(Mono.fromRunnable(() -> factory.endProcessingRequest(systemSession))); } private Mono<Void> handleMailboxEvent(Event event, MailboxSession session, MailboxEvent mailboxEvent) { diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/MailboxAnnotationListenerTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/MailboxAnnotationListenerTest.java index 1c310a5447..19739fdac7 100644 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/MailboxAnnotationListenerTest.java +++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/MailboxAnnotationListenerTest.java @@ -125,6 +125,7 @@ class MailboxAnnotationListenerTest { listener.event(deleteEvent); verify(mailboxSessionMapperFactory).getAnnotationMapper(eq(mailboxSession)); + verify(mailboxSessionMapperFactory).endProcessingRequest(eq(mailboxSession)); verify(annotationMapper).getAllAnnotationsReactive(eq(mailboxId)); verifyNoMoreInteractions(mailboxSessionMapperFactory); @@ -140,6 +141,7 @@ class MailboxAnnotationListenerTest { listener.event(deleteEvent); verify(mailboxSessionMapperFactory).getAnnotationMapper(eq(mailboxSession)); + verify(mailboxSessionMapperFactory).endProcessingRequest(eq(mailboxSession)); verify(annotationMapper).getAllAnnotationsReactive(eq(mailboxId)); verify(annotationMapper).deleteAnnotationReactive(eq(mailboxId), eq(PRIVATE_KEY)); verify(annotationMapper).deleteAnnotationReactive(eq(mailboxId), eq(SHARED_KEY)); @@ -158,6 +160,7 @@ class MailboxAnnotationListenerTest { assertThatThrownBy(() -> listener.event(deleteEvent)).isInstanceOf(RuntimeException.class); verify(mailboxSessionMapperFactory).getAnnotationMapper(eq(mailboxSession)); + verify(mailboxSessionMapperFactory).endProcessingRequest(eq(mailboxSession)); verify(annotationMapper).getAllAnnotationsReactive(eq(mailboxId)); verify(annotationMapper).deleteAnnotationReactive(eq(mailboxId), eq(PRIVATE_KEY)); diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java index f557602cd1..d4adbfa34f 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java @@ -106,5 +106,6 @@ public class ReIndexerImpl implements ReIndexer { private void validateIdExists(MailboxId mailboxId) throws MailboxException { MailboxSession mailboxSession = mailboxManager.createSystemSession(Username.of("ReIndexingImap")); MailboxReactorUtils.block(mapperFactory.getMailboxMapper(mailboxSession).findMailboxById(mailboxId)); + mailboxManager.endProcessingRequest(mailboxSession); } } diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java index eafd50fe2a..9a1d779bc1 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java @@ -155,7 +155,8 @@ public class ReIndexerPerformer { .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession, runningOptions), MAILBOX_CONCURRENCY); return reIndexMessages(entriesToIndex, runningOptions, reIndexingContext) - .doFinally(any -> LOGGER.info("Full reindex finished")); + .doFinally(any -> LOGGER.info("Full reindex finished")) + .doFinally(any -> mailboxManager.endProcessingRequest(mailboxSession)); } Mono<Result> reIndexSingleMailbox(MailboxId mailboxId, ReIndexingContext reIndexingContext, RunningOptions runningOptions) { @@ -165,7 +166,8 @@ public class ReIndexerPerformer { .findMailboxById(mailboxId) .flatMapMany(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession, runningOptions)); - return reIndexMessages(entriesToIndex, runningOptions, reIndexingContext); + return reIndexMessages(entriesToIndex, runningOptions, reIndexingContext) + .doFinally(any -> mailboxManager.endProcessingRequest(mailboxSession)); } Mono<Result> reIndexUserMailboxes(Username username, ReIndexingContext reIndexingContext, RunningOptions runningOptions) { @@ -180,7 +182,8 @@ public class ReIndexerPerformer { .flatMap(mailbox -> reIndexingEntriesForMailbox(mailbox, mailboxSession, runningOptions), MAILBOX_CONCURRENCY); return reIndexMessages(entriesToIndex, runningOptions, reIndexingContext) - .doFinally(any -> LOGGER.info("User {} reindex finished", username.asString())); + .doFinally(any -> LOGGER.info("User {} reindex finished", username.asString())) + .doFinally(any -> mailboxManager.endProcessingRequest(mailboxSession)); } catch (Exception e) { LOGGER.error("Error fetching mailboxes for user: {}", username.asString()); return Mono.just(Result.PARTIAL); @@ -195,7 +198,8 @@ public class ReIndexerPerformer { .map(mailbox -> new ReIndexingEntry(mailbox, mailboxSession, uid)) .flatMap(this::fullyReadMessage) .flatMap(message -> reIndex(message, mailboxSession)) - .switchIfEmpty(Mono.just(Result.COMPLETED)); + .switchIfEmpty(Mono.just(Result.COMPLETED)) + .doFinally(any -> mailboxManager.endProcessingRequest(mailboxSession)); } Mono<Result> reIndexMessageId(MessageId messageId) { @@ -209,7 +213,8 @@ public class ReIndexerPerformer { .onErrorResume(e -> { LOGGER.warn("Failed to re-index {}", messageId, e); return Mono.just(Result.PARTIAL); - }); + }) + .doFinally(any -> mailboxManager.endProcessingRequest(session)); } Mono<Result> reIndexErrors(ReIndexingContext reIndexingContext, ReIndexingExecutionFailures previousReIndexingFailures, RunningOptions runningOptions) { @@ -227,7 +232,8 @@ public class ReIndexerPerformer { return Mono.just(Either.left(new MailboxFailure(mailboxId))); }), MAILBOX_CONCURRENCY)); - return reIndexMessages(entriesToIndex, runningOptions, reIndexingContext); + return reIndexMessages(entriesToIndex, runningOptions, reIndexingContext) + .doFinally(any -> mailboxManager.endProcessingRequest(mailboxSession)); } private Mono<Result> reIndex(MailboxMessage mailboxMessage, MailboxSession session) { diff --git a/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java b/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java index acd288dfe7..f2d1a9c431 100644 --- a/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java +++ b/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong; import javax.inject.Inject; import org.apache.james.core.Username; +import org.apache.james.mailbox.MailboxManager; import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.SessionProvider; import org.apache.james.mailbox.model.QuotaOperation; @@ -147,18 +148,21 @@ public class RecomputeCurrentQuotasService { private final CurrentQuotaCalculator currentQuotaCalculator; private final UserQuotaRootResolver userQuotaRootResolver; private final SessionProvider sessionProvider; + private final MailboxManager mailboxManager; @Inject public RecomputeCurrentQuotasService(UsersRepository usersRepository, CurrentQuotaManager storeCurrentQuotaManager, CurrentQuotaCalculator currentQuotaCalculator, UserQuotaRootResolver userQuotaRootResolver, - SessionProvider sessionProvider) { + SessionProvider sessionProvider, + MailboxManager mailboxManager) { this.usersRepository = usersRepository; this.storeCurrentQuotaManager = storeCurrentQuotaManager; this.currentQuotaCalculator = currentQuotaCalculator; this.userQuotaRootResolver = userQuotaRootResolver; this.sessionProvider = sessionProvider; + this.mailboxManager = mailboxManager; } public Mono<Task.Result> recomputeCurrentQuotas(Context context, RunningOptions runningOptions) { @@ -190,6 +194,7 @@ public class RecomputeCurrentQuotasService { LOGGER.error("Error while recomputing current quotas for {}", quotaRoot, e); context.addToFailedMailboxes(quotaRoot); return Mono.just(Task.Result.PARTIAL); - }); + }) + .doFinally(any -> mailboxManager.endProcessingRequest(session)); } } diff --git a/server/container/feature-checks/src/main/java/org/apache/james/healthcheck/MailReceptionCheck.java b/server/container/feature-checks/src/main/java/org/apache/james/healthcheck/MailReceptionCheck.java index c8f31d0844..8d865ee097 100644 --- a/server/container/feature-checks/src/main/java/org/apache/james/healthcheck/MailReceptionCheck.java +++ b/server/container/feature-checks/src/main/java/org/apache/james/healthcheck/MailReceptionCheck.java @@ -223,7 +223,8 @@ public class MailReceptionCheck implements HealthCheck { .onErrorResume(e -> { LOGGER.error("Mail reception check failed", e); return Mono.just(Result.unhealthy(componentName(), e.getMessage())); - }); + }) + .doFinally(any -> mailboxManager.endProcessingRequest(session)); } private Mono<MessageManager> retrieveInbox(Username username, MailboxSession session) { diff --git a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/ACLUsernameChangeTaskStep.java b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/ACLUsernameChangeTaskStep.java index 813dc8a043..7027645eb0 100644 --- a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/ACLUsernameChangeTaskStep.java +++ b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/ACLUsernameChangeTaskStep.java @@ -67,7 +67,9 @@ public class ACLUsernameChangeTaskStep implements UsernameChangeTaskStep { return mailboxManager.search(MailboxQuery.builder().matchesAllMailboxNames().build(), oldSession) .filter(mailbox -> !mailbox.getPath().getUser().equals(oldUsername)) .concatMap(mailbox -> migrateACLs(oldUsername, newUsername, mailbox)) - .then(updateSubscriptionsOnDeletedMailboxes(oldUsername, oldSession, newSession)); + .then(updateSubscriptionsOnDeletedMailboxes(oldUsername, oldSession, newSession)) + .doFinally(any -> mailboxManager.endProcessingRequest(oldSession)) + .doFinally(any -> mailboxManager.endProcessingRequest(newSession)); } private Mono<Void> updateSubscriptionsOnDeletedMailboxes(Username oldUsername, MailboxSession oldSession, MailboxSession newSession) { @@ -90,6 +92,7 @@ public class ACLUsernameChangeTaskStep implements UsernameChangeTaskStep { return Mono.fromRunnable(Throwing.runnable(() -> mailboxManager.applyRightsCommand(mailbox.getId(), MailboxACL.command().rights(rights).forUser(newUsername).asAddition(), ownerSession))) .then(Mono.fromRunnable(Throwing.runnable(() -> mailboxManager.applyRightsCommand(mailbox.getId(), MailboxACL.command().rights(rights).forUser(oldUsername).asRemoval(), ownerSession)))) .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER) - .then(); + .then() + .doFinally(any -> mailboxManager.endProcessingRequest(ownerSession)); } } diff --git a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUserDeletionTaskStep.java b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUserDeletionTaskStep.java index 035686decf..d916fbf9b4 100644 --- a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUserDeletionTaskStep.java +++ b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUserDeletionTaskStep.java @@ -71,7 +71,8 @@ public class MailboxUserDeletionTaskStep implements DeleteUserDataTaskStep { .then(getSharedMailboxesOfUser(mailboxSession) .flatMap(sharedMailbox -> revokeACLs(username, sharedMailbox) .then(deleteSubscription(mailboxSession, sharedMailbox))) - .then()); + .then()) + .doFinally(any -> mailboxManager.endProcessingRequest(mailboxSession)); } private Flux<MailboxMetaData> getAllMailboxesOfUser(MailboxSession mailboxSession) { @@ -103,7 +104,8 @@ public class MailboxUserDeletionTaskStep implements DeleteUserDataTaskStep { return Mono.fromRunnable(Throwing.runnable(() -> mailboxManager.applyRightsCommand(mailbox.getId(), MailboxACL.command().rights(rights).forUser(username).asRemoval(), ownerSession))) .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER) - .then(); + .then() + .doFinally(any -> mailboxManager.endProcessingRequest(ownerSession)); } } diff --git a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java index cdf02d268a..a28d9544ad 100644 --- a/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java +++ b/server/container/mailbox-adapter/src/main/java/org/apache/james/adapter/mailbox/MailboxUsernameChangeTaskStep.java @@ -72,7 +72,9 @@ public class MailboxUsernameChangeTaskStep implements UsernameChangeTaskStep { return mailboxManager.search(queryUser, MailboxManager.MailboxSearchFetchType.Minimal, fromSession) // Only keep top level, rename takes care of sub mailboxes .filter(mailbox -> mailbox.getPath().getHierarchyLevels(fromSession.getPathDelimiter()).size() == 1) - .concatMap(mailbox -> migrateMailbox(fromSession, toSession, mailbox)); + .concatMap(mailbox -> migrateMailbox(fromSession, toSession, mailbox)) + .doFinally(any -> mailboxManager.endProcessingRequest(fromSession)) + .doFinally(any -> mailboxManager.endProcessingRequest(toSession)); } private Mono<Void> migrateMailbox(MailboxSession fromSession, MailboxSession toSession, org.apache.james.mailbox.model.MailboxMetaData mailbox) { @@ -115,16 +117,20 @@ public class MailboxUsernameChangeTaskStep implements UsernameChangeTaskStep { return Flux.fromIterable(mailbox.getResolvedAcls().getEntries().entrySet()) .filter(entry -> entry.getKey().getNameType() == MailboxACL.NameType.user && !entry.getKey().isNegative()) .map(entry -> Username.of(entry.getKey().getName())) - .concatMap(Throwing.function(userWithAccess -> - Flux.from(subscriptionManager.subscriptionsReactive(mailboxManager.createSystemSession(userWithAccess))) + .concatMap(Throwing.function(userWithAccess -> { + MailboxSession session = mailboxManager.createSystemSession(userWithAccess); + return Flux.from(subscriptionManager.subscriptionsReactive(session)) .filter(subscribedMailbox -> subscribedMailbox.equals(mailbox.getPath())) - .concatMap(any -> renameSubscription(mailbox, renamedPath, userWithAccess)))) + .concatMap(any -> renameSubscription(mailbox, renamedPath, userWithAccess)) + .doFinally(any -> mailboxManager.endProcessingRequest(session)); + })) .then(); } private Mono<Void> renameSubscription(MailboxMetaData mailbox, MailboxPath renamedPath, Username user) { MailboxSession session = mailboxManager.createSystemSession(user); return Mono.from(subscriptionManager.subscribeReactive(renamedPath, session)) - .then(Mono.from(subscriptionManager.unsubscribeReactive(mailbox.getPath(), session))); + .then(Mono.from(subscriptionManager.unsubscribeReactive(mailbox.getPath(), session))) + .doFinally(any -> mailboxManager.endProcessingRequest(session)); } } diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RandomStoring.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RandomStoring.java index 6b817023d3..c1eb916848 100644 --- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RandomStoring.java +++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RandomStoring.java @@ -122,7 +122,8 @@ public class RandomStoring extends GenericMailet { MailboxSession session = mailboxManager.createSystemSession(username); return mailboxManager .search(MailboxQuery.privateMailboxesBuilder(session).build(), Minimal, session) - .map(metaData -> new ReroutingInfos(metaData.getPath().getName(), username)); + .map(metaData -> new ReroutingInfos(metaData.getPath().getName(), username)) + .doFinally(any -> mailboxManager.endProcessingRequest(session)); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/PostDequeueDecorator.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/PostDequeueDecorator.java index 44daed400f..be51824591 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/PostDequeueDecorator.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/PostDequeueDecorator.java @@ -88,6 +88,7 @@ public class PostDequeueDecorator extends MailQueueItemDecorator { MailboxSession mailboxSession = mailboxManager.createSystemSession(Username.of(username.get())); moveFromOutboxToSentWithSeenFlag(messageId, mailboxSession); getMail().setAttribute(IS_DELIVERED); + mailboxManager.endProcessingRequest(mailboxSession); } catch (MailShouldBeInOutboxException e) { LOG.info("Message does not exist on Outbox anymore, it could have already been sent", e); } diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/filter/ActionApplier.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/filter/ActionApplier.java index 5dcbc2b432..d9845cc581 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/filter/ActionApplier.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/filter/ActionApplier.java @@ -124,6 +124,7 @@ public class ActionApplier { MailboxId mailboxId = mailboxIdFactory.fromString(mailboxIdString); MailboxSession mailboxSession = mailboxManager.createSystemSession(username); MessageManager messageManager = mailboxManager.getMailbox(mailboxId, mailboxSession); + mailboxManager.endProcessingRequest(mailboxSession); return Stream.of(messageManager.getMailboxPath().getName()); } catch (MailboxNotFoundException e) { diff --git a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/RestoreService.java b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/RestoreService.java index 554e273434..bca7bcfeea 100644 --- a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/RestoreService.java +++ b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/RestoreService.java @@ -79,7 +79,8 @@ public class RestoreService { MessageManager restoreMessageManager = restoreMailboxManager(session); return Flux.from(deletedMessageVault.search(usernameToRestore, searchQuery)) - .flatMap(deletedMessage -> appendToMailbox(restoreMessageManager, deletedMessage, session), DEFAULT_CONCURRENCY); + .flatMap(deletedMessage -> appendToMailbox(restoreMessageManager, deletedMessage, session), DEFAULT_CONCURRENCY) + .doFinally(any -> mailboxManager.endProcessingRequest(session)); } private Mono<RestoreResult> appendToMailbox(MessageManager restoreMailboxManager, DeletedMessage deletedMessage, MailboxSession session) { diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/CreateMissingParentsTask.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/CreateMissingParentsTask.java index d8da1c6eba..c92b172c24 100644 --- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/CreateMissingParentsTask.java +++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/CreateMissingParentsTask.java @@ -153,6 +153,8 @@ public class CreateMissingParentsTask implements Task { } catch (MailboxException e) { LOGGER.error("Error fetching mailbox paths", e); return Result.PARTIAL; + } finally { + mailboxManager.endProcessingRequest(session); } } @@ -160,12 +162,13 @@ public class CreateMissingParentsTask implements Task { MailboxSession ownerSession = mailboxManager.createSystemSession(path.getUser()); return Mono.from(mailboxManager.createMailboxReactive(path, ownerSession)) .doOnNext(this::recordSuccess) - .then(Mono.just(Result.COMPLETED)) - .onErrorResume(e -> { - LOGGER.error("Error creating missing parent mailbox: {}", path.getName(), e); - recordFailure(path); - return Mono.just(Result.PARTIAL); - }); + .then(Mono.just(Result.COMPLETED)) + .onErrorResume(e -> { + LOGGER.error("Error creating missing parent mailbox: {}", path.getName(), e); + recordFailure(path); + return Mono.just(Result.PARTIAL); + }) + .doFinally(any -> mailboxManager.endProcessingRequest(ownerSession)); } @Override diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxService.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxService.java index fc709d4bba..526cbdae45 100644 --- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxService.java +++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxService.java @@ -217,7 +217,8 @@ public class ExpireMailboxService { context.incrementFailedCount(); context.incrementProcessedCount(); return Mono.just(Task.Result.PARTIAL); - }); + }) + .doFinally(any -> mailboxManager.endProcessingRequest(session)); } private Mono<List<MessageUid>> searchMessagesReactive(MessageManager mgr, MailboxSession session, SearchQuery expiration) { diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/UserMailboxesService.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/UserMailboxesService.java index a7bf0d7291..e57048f5ac 100644 --- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/UserMailboxesService.java +++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/UserMailboxesService.java @@ -74,6 +74,7 @@ public class UserMailboxesService { MailboxPath mailboxPath = MailboxPath.forUser(username, mailboxName.asString()) .assertAcceptable(mailboxSession.getPathDelimiter()); mailboxManager.createMailbox(mailboxPath, mailboxSession); + mailboxManager.endProcessingRequest(mailboxSession); } catch (MailboxExistsException e) { LOGGER.info("Attempt to create mailbox {} for user {} that already exists", mailboxName, username); } @@ -85,14 +86,19 @@ public class UserMailboxesService { listUserMailboxes(mailboxSession) .map(MailboxMetaData::getPath) .forEach(Throwing.consumer(mailboxPath -> deleteMailbox(mailboxSession, mailboxPath))); + mailboxManager.endProcessingRequest(mailboxSession); } public List<MailboxResponse> listMailboxes(Username username) throws UsersRepositoryException { usernamePreconditions(username); MailboxSession mailboxSession = mailboxManager.createSystemSession(username); - return listUserMailboxes(mailboxSession) - .map(mailboxMetaData -> new MailboxResponse(mailboxMetaData.getPath().getName(), mailboxMetaData.getId())) - .collect(ImmutableList.toImmutableList()); + try { + return listUserMailboxes(mailboxSession) + .map(mailboxMetaData -> new MailboxResponse(mailboxMetaData.getPath().getName(), mailboxMetaData.getId())) + .collect(ImmutableList.toImmutableList()); + } finally { + mailboxManager.endProcessingRequest(mailboxSession); + } } public boolean testMailboxExists(Username username, MailboxName mailboxName) throws MailboxException, UsersRepositoryException { @@ -100,8 +106,12 @@ public class UserMailboxesService { MailboxSession mailboxSession = mailboxManager.createSystemSession(username); MailboxPath mailboxPath = MailboxPath.forUser(username, mailboxName.asString()) .assertAcceptable(mailboxSession.getPathDelimiter()); - return Mono.from(mailboxManager.mailboxExists(mailboxPath, mailboxSession)) - .block(); + try { + return Mono.from(mailboxManager.mailboxExists(mailboxPath, mailboxSession)) + .block(); + } finally { + mailboxManager.endProcessingRequest(mailboxSession); + } } @@ -118,7 +128,8 @@ public class UserMailboxesService { return Mono.just(Result.PARTIAL); }) .reduce(Task::combine) - .switchIfEmpty(Mono.just(Result.COMPLETED)); + .switchIfEmpty(Mono.just(Result.COMPLETED)) + .doFinally(any -> mailboxManager.endProcessingRequest(mailboxSession)); } private Mono<Result> deleteMessage(MessageManager messageManager, MessageUid messageUid, MailboxSession mailboxSession, ClearMailboxContentTask.Context context) { @@ -138,20 +149,29 @@ public class UserMailboxesService { .assertAcceptable(mailboxSession.getPathDelimiter()); listChildren(mailboxPath, mailboxSession) .forEach(Throwing.consumer(path -> deleteMailbox(mailboxSession, path))); + mailboxManager.endProcessingRequest(mailboxSession); } public long messageCount(Username username, MailboxName mailboxName) throws UsersRepositoryException, MailboxException { usernamePreconditions(username); MailboxSession mailboxSession = mailboxManager.createSystemSession(username); - return mailboxManager.getMailbox(MailboxPath.forUser(username, mailboxName.asString()), mailboxSession).getMessageCount(mailboxSession); + try { + return mailboxManager.getMailbox(MailboxPath.forUser(username, mailboxName.asString()), mailboxSession).getMessageCount(mailboxSession); + } finally { + mailboxManager.endProcessingRequest(mailboxSession); + } } public long unseenMessageCount(Username username, MailboxName mailboxName) throws UsersRepositoryException, MailboxException { usernamePreconditions(username); MailboxSession mailboxSession = mailboxManager.createSystemSession(username); - return mailboxManager.getMailbox(MailboxPath.forUser(username, mailboxName.asString()), mailboxSession) - .getMailboxCounters(mailboxSession) - .getUnseen(); + try { + return mailboxManager.getMailbox(MailboxPath.forUser(username, mailboxName.asString()), mailboxSession) + .getMailboxCounters(mailboxSession) + .getUnseen(); + } finally { + mailboxManager.endProcessingRequest(mailboxSession); + } } private Stream<MailboxPath> listChildren(MailboxPath mailboxPath, MailboxSession mailboxSession) { @@ -178,6 +198,7 @@ public class UserMailboxesService { .assertAcceptable(mailboxSession.getPathDelimiter()); Preconditions.checkState(Boolean.TRUE.equals(Mono.from(mailboxManager.mailboxExists(mailboxPath, mailboxSession)).block()), "Mailbox does not exist. " + mailboxPath.asString()); + mailboxManager.endProcessingRequest(mailboxSession); } private Stream<MailboxMetaData> listUserMailboxes(MailboxSession mailboxSession) { diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/RspamdListener.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/RspamdListener.java index 6af803e4c2..6d9b1c5613 100644 --- a/third-party/rspamd/src/main/java/org/apache/james/rspamd/RspamdListener.java +++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/RspamdListener.java @@ -101,10 +101,12 @@ public class RspamdListener implements SpamEventListener, EventListener.Reactive } private Mono<Void> handleMessageAdded(MailboxEvents.Added addedEvent) { + MailboxSession mailboxSession = mailboxManager.createSystemSession(Username.of(getClass().getCanonicalName())); return isAppendedToInbox(addedEvent) .filter(FunctionalUtils.identityPredicate()) .doOnNext(isHam -> LOGGER.debug("Ham event detected, EventId = {}", addedEvent.getEventId().getId())) - .flatMap(any -> reportHamWhenAdded(addedEvent, mailboxManager.createSystemSession(Username.of(getClass().getCanonicalName())))); + .flatMap(any -> reportHamWhenAdded(addedEvent, mailboxSession)) + .then(Mono.fromRunnable(() -> mailboxManager.endProcessingRequest(mailboxSession))); } private Mono<Void> handleMessageMoved(MessageMoveEvent messageMoveEvent) { @@ -123,9 +125,11 @@ public class RspamdListener implements SpamEventListener, EventListener.Reactive } private Flux<ByteBuffer> mailboxMessagePublisher(MessageMoveEvent messageMoveEvent) { - return Mono.fromCallable(() -> mapperFactory.getMessageIdMapper(mailboxManager.createSystemSession(Username.of(getClass().getCanonicalName())))) + MailboxSession mailboxSession = mailboxManager.createSystemSession(Username.of(getClass().getCanonicalName())); + return Mono.fromCallable(() -> mapperFactory.getMessageIdMapper(mailboxSession)) .flatMapMany(messageIdMapper -> messageIdMapper.findReactive(messageMoveEvent.getMessageIds(), MessageMapper.FetchType.FULL)) - .flatMap(MailboxMessage::getFullContentReactive); + .flatMap(MailboxMessage::getFullContentReactive) + .doFinally(any -> mailboxManager.endProcessingRequest(mailboxSession)); } private Mono<Void> handleMessageMoved(Flux<ByteBuffer> mailboxMessagesPublisher, MessageMoveEvent messageMoveEvent) { diff --git a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java index 55bbb1b089..447fd17687 100644 --- a/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java +++ b/third-party/rspamd/src/main/java/org/apache/james/rspamd/task/GetMailboxMessagesService.java @@ -102,7 +102,8 @@ public class GetMailboxMessagesService { .doOnNext(mailboxMessageMetaData -> context.incrementSpamMessageCount()) .filter(message -> randomBooleanWithProbability(runningOptions)) .flatMap(message -> messageIdManager.getMessagesReactive(List.of(message.getMessageId()), FetchGroup.FULL_CONTENT, mailboxSession), ReactorUtils.DEFAULT_CONCURRENCY) - .filter(runningOptions.correspondingClassificationFilter()); + .filter(runningOptions.correspondingClassificationFilter()) + .doFinally(any -> mailboxManager.endProcessingRequest(mailboxSession)); } private Flux<MessageResult> getMailboxMessagesOfAUser(Username username, MailboxMetaData mailboxMetaData, Optional<Date> afterDate, @@ -119,7 +120,8 @@ public class GetMailboxMessagesService { .doOnNext(mailboxMessageMetaData -> context.incrementHamMessageCount()) .filter(message -> randomBooleanWithProbability(runningOptions)) .flatMap(message -> messageIdManager.getMessagesReactive(List.of(message.getMessageId()), FetchGroup.FULL_CONTENT, mailboxSession), ReactorUtils.DEFAULT_CONCURRENCY) - .filter(runningOptions.correspondingClassificationFilter()); + .filter(runningOptions.correspondingClassificationFilter()) + .doFinally(any -> mailboxManager.endProcessingRequest(mailboxSession)); } public static boolean randomBooleanWithProbability(RunningOptions runningOptions) { diff --git a/third-party/spamassassin/src/main/java/org/apache/james/spamassassin/SpamAssassinListener.java b/third-party/spamassassin/src/main/java/org/apache/james/spamassassin/SpamAssassinListener.java index 19a86262df..cace69aef6 100644 --- a/third-party/spamassassin/src/main/java/org/apache/james/spamassassin/SpamAssassinListener.java +++ b/third-party/spamassassin/src/main/java/org/apache/james/spamassassin/SpamAssassinListener.java @@ -97,10 +97,12 @@ public class SpamAssassinListener implements SpamEventListener { if (event instanceof MessageMoveEvent) { MailboxSession session = mailboxManager.createSystemSession(username); handleMessageMove(event, session, (MessageMoveEvent) event); + mailboxManager.endProcessingRequest(session); } if (event instanceof Added) { MailboxSession session = mailboxManager.createSystemSession(username); handleAdded(event, session, (Added) event); + mailboxManager.endProcessingRequest(session); } } --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org