This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 4ffa7e89d101732669d8f5af4ef21806d33bad3e Author: Matthieu Baechler <matth...@apache.org> AuthorDate: Fri Dec 6 11:54:18 2019 +0100 JAMES-2979 try to simplify Spooler code --- .../mailetcontainer/impl/JamesMailSpooler.java | 35 +++++++++++----------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java index 2fb1799..63a67e7 100644 --- a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java +++ b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java @@ -43,8 +43,6 @@ import org.apache.mailet.Mail; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.github.fge.lambdas.Throwing; - import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; @@ -120,7 +118,7 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB TimeMetric timeMetric = metricFactory.timer(SPOOL_PROCESSING); try { return Mono.fromCallable(processingActive::incrementAndGet) - .flatMap(ignore -> processMail(queueItem)) + .flatMap(ignore -> processMail(queueItem).subscribeOn(spooler)) .doOnSuccess(any -> timeMetric.stopAndPublish()) .doOnSuccess(any -> processingActive.decrementAndGet()); } catch (Throwable e) { @@ -129,31 +127,34 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB } private Mono<Void> processMail(MailQueueItem queueItem) { - Mono<Mail> mailPublisher = Mono.fromSupplier(queueItem::getMail); - return mailPublisher.flatMap(mail -> Mono.fromRunnable(() -> LOGGER.debug("==== Begin processing mail {} ====", mail.getName())) - .subscribeOn(spooler) - .then(Mono.fromCallable(() -> performProcessMail(mail))) - .flatMap(any -> acknowledgeItem(queueItem, true)) - .onErrorResume(any -> acknowledgeItem(queueItem, false)) - .then(Mono.fromRunnable(() -> LOGGER.debug("==== End processing mail {} ====", mail.getName()))) - .then(Mono.fromRunnable(() -> LifecycleUtil.dispose(mail)))); + return Mono + .using( + queueItem::getMail, + resource -> Mono.just(resource) + .doOnNext(mail -> LOGGER.debug("==== Begin processing mail {} ====", mail.getName())) + .map(mail -> performProcessMail(queueItem, mail)) + .doOnNext(mail -> LOGGER.debug("==== End processing mail {} ====", mail.getName())), + LifecycleUtil::dispose) + .then(); } - private Mono<Void> acknowledgeItem(MailQueueItem queueItem, boolean success) { - return Mono.fromRunnable(Throwing.runnable(() -> queueItem.done(success)).sneakyThrow()); - } - private boolean performProcessMail(Mail mail) { + private Mail performProcessMail(MailQueueItem queueItem, Mail mail) { try { mailProcessor.service(mail); if (Thread.currentThread().isInterrupted()) { throw new InterruptedException("Thread has been interrupted"); } - return true; + queueItem.done(true); } catch (Exception e) { - throw new RuntimeException(e); + try { + queueItem.done(false); + } catch (MailQueue.MailQueueException ex) { + throw new RuntimeException(e); + } } + return mail; } /** --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org