This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit c4b9d184dad70899baa7eee3ab3474d8c4f7eb6c Author: Rémi KOWALSKI <rkowal...@linagora.com> AuthorDate: Thu Nov 14 13:53:17 2019 +0100 JAMES-2979 fix concurent dequeue for MemoryMailQueue --- .../java/org/apache/james/queue/memory/MemoryMailQueueFactory.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java index b012f7f..2066a7c 100644 --- a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java +++ b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java @@ -55,6 +55,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; public class MemoryMailQueueFactory implements MailQueueFactory<ManageableMailQueue> { @@ -96,7 +97,9 @@ public class MemoryMailQueueFactory implements MailQueueFactory<ManageableMailQu this.name = name; this.flux = Mono.fromCallable(mailItems::take) .repeat() - .flatMap(item -> Mono.just(inProcessingMailItems.add(item)).thenReturn(item)) + .subscribeOn(Schedulers.boundedElastic()) + .flatMap(item -> + Mono.fromRunnable(() -> inProcessingMailItems.add(item)).thenReturn(item)) .map(mailQueueItemDecoratorFactory::decorate); } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org