This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 20b135341052d2630f46940de1eb7ea679c4ea95 Author: RĂ©mi KOWALSKI <rkowal...@linagora.com> AuthorDate: Thu Nov 21 13:32:59 2019 +0100 JAMES-2979 fix spooler reactor usage --- .../mailetcontainer/impl/JamesMailSpooler.java | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java index 2bd7ed0..2fb1799 100644 --- a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java +++ b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java @@ -78,6 +78,7 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB private final MailQueueFactory<?> queueFactory; private reactor.core.Disposable disposable; private Scheduler spooler; + private int parallelismLevel; @Inject public JamesMailSpooler(MetricFactory metricFactory, MailProcessor mailProcessor, MailQueueFactory<?> queueFactory) { @@ -89,6 +90,9 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB @Override public void configure(HierarchicalConfiguration<ImmutableNode> config) { numThreads = config.getInt("threads", 100); + //Reactor helps us run things in parallel but we have to ensure there are always threads available + //in the threadpool to avoid starvation. + parallelismLevel = Math.max(1, numThreads - 2); } /** @@ -105,19 +109,18 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB private void run() { LOGGER.info("Queue={}", queue); - disposable = Flux.from(queue.deQueue()) - .flatMap(item -> handleOnQueueItem(item).subscribeOn(spooler)) + .flatMap(item -> handleOnQueueItem(item).subscribeOn(spooler), parallelismLevel) .onErrorContinue((throwable, item) -> LOGGER.error("Exception processing mail while spooling {}", item, throwable)) - .subscribeOn(Schedulers.boundedElastic()) + .subscribeOn(spooler) .subscribe(); } private Mono<Void> handleOnQueueItem(MailQueueItem queueItem) { TimeMetric timeMetric = metricFactory.timer(SPOOL_PROCESSING); try { - processingActive.incrementAndGet(); - return processMail(queueItem) + return Mono.fromCallable(processingActive::incrementAndGet) + .flatMap(ignore -> processMail(queueItem)) .doOnSuccess(any -> timeMetric.stopAndPublish()) .doOnSuccess(any -> processingActive.decrementAndGet()); } catch (Throwable e) { @@ -126,14 +129,14 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB } private Mono<Void> processMail(MailQueueItem queueItem) { - Mail mail = queueItem.getMail(); - return Mono.fromRunnable(() -> LOGGER.debug("==== Begin processing mail {} ====", mail.getName())) - .subscribeOn(Schedulers.boundedElastic()) + Mono<Mail> mailPublisher = Mono.fromSupplier(queueItem::getMail); + return mailPublisher.flatMap(mail -> Mono.fromRunnable(() -> LOGGER.debug("==== Begin processing mail {} ====", mail.getName())) + .subscribeOn(spooler) .then(Mono.fromCallable(() -> performProcessMail(mail))) .flatMap(any -> acknowledgeItem(queueItem, true)) .onErrorResume(any -> acknowledgeItem(queueItem, false)) .then(Mono.fromRunnable(() -> LOGGER.debug("==== End processing mail {} ====", mail.getName()))) - .then(Mono.fromRunnable(() -> LifecycleUtil.dispose(mail))); + .then(Mono.fromRunnable(() -> LifecycleUtil.dispose(mail)))); } private Mono<Void> acknowledgeItem(MailQueueItem queueItem, boolean success) { --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org