This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 4ffa7e89d101732669d8f5af4ef21806d33bad3e
Author: Matthieu Baechler <matth...@apache.org>
AuthorDate: Fri Dec 6 11:54:18 2019 +0100

    JAMES-2979 try to simplify Spooler code
---
 .../mailetcontainer/impl/JamesMailSpooler.java     | 35 +++++++++++-----------
 1 file changed, 18 insertions(+), 17 deletions(-)

diff --git 
a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
 
b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
index 2fb1799..63a67e7 100644
--- 
a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
+++ 
b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
@@ -43,8 +43,6 @@ import org.apache.mailet.Mail;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.github.fge.lambdas.Throwing;
-
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Scheduler;
@@ -120,7 +118,7 @@ public class JamesMailSpooler implements Disposable, 
Configurable, MailSpoolerMB
         TimeMetric timeMetric = metricFactory.timer(SPOOL_PROCESSING);
         try {
             return Mono.fromCallable(processingActive::incrementAndGet)
-                .flatMap(ignore -> processMail(queueItem))
+                .flatMap(ignore -> processMail(queueItem).subscribeOn(spooler))
                 .doOnSuccess(any -> timeMetric.stopAndPublish())
                 .doOnSuccess(any -> processingActive.decrementAndGet());
         } catch (Throwable e) {
@@ -129,31 +127,34 @@ public class JamesMailSpooler implements Disposable, 
Configurable, MailSpoolerMB
     }
 
     private Mono<Void> processMail(MailQueueItem queueItem) {
-        Mono<Mail> mailPublisher = Mono.fromSupplier(queueItem::getMail);
-        return mailPublisher.flatMap(mail -> Mono.fromRunnable(() -> 
LOGGER.debug("==== Begin processing mail {} ====", mail.getName()))
-            .subscribeOn(spooler)
-            .then(Mono.fromCallable(() -> performProcessMail(mail)))
-            .flatMap(any -> acknowledgeItem(queueItem, true))
-            .onErrorResume(any -> acknowledgeItem(queueItem, false))
-            .then(Mono.fromRunnable(() -> LOGGER.debug("==== End processing 
mail {} ====", mail.getName())))
-            .then(Mono.fromRunnable(() -> LifecycleUtil.dispose(mail))));
+        return Mono
+            .using(
+                queueItem::getMail,
+                resource -> Mono.just(resource)
+                    .doOnNext(mail -> LOGGER.debug("==== Begin processing mail 
{} ====", mail.getName()))
+                    .map(mail -> performProcessMail(queueItem, mail))
+                    .doOnNext(mail -> LOGGER.debug("==== End processing mail 
{} ====", mail.getName())),
+                LifecycleUtil::dispose)
+            .then();
     }
 
-    private Mono<Void> acknowledgeItem(MailQueueItem queueItem, boolean 
success) {
-        return Mono.fromRunnable(Throwing.runnable(() -> 
queueItem.done(success)).sneakyThrow());
-    }
 
-    private boolean performProcessMail(Mail mail) {
+    private Mail performProcessMail(MailQueueItem queueItem, Mail mail) {
         try {
             mailProcessor.service(mail);
 
             if (Thread.currentThread().isInterrupted()) {
                 throw new InterruptedException("Thread has been interrupted");
             }
-            return true;
+            queueItem.done(true);
         } catch (Exception e) {
-            throw new RuntimeException(e);
+            try {
+                queueItem.done(false);
+            } catch (MailQueue.MailQueueException ex) {
+                throw new RuntimeException(e);
+            }
         }
+        return mail;
     }
 
     /**


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to