This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit fb23ed7c703a6b8afdb0bcb51c5b6446815ba2fc
Author: Rémi KOWALSKI <rkowal...@linagora.com>
AuthorDate: Thu Nov 21 14:07:25 2019 +0100

    JAMES-2979 retry storing mail with a boundedElastic scheduler in 
Maildispatcher
---
 .../transport/mailets/delivery/MailDispatcher.java | 26 ++++++++++++++++++----
 1 file changed, 22 insertions(+), 4 deletions(-)

diff --git 
a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java
 
b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java
index 339323f..4737012 100644
--- 
a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java
+++ 
b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java
@@ -18,6 +18,7 @@
  ****************************************************************/
 package org.apache.james.transport.mailets.delivery;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -36,20 +37,27 @@ import org.apache.mailet.base.RFC2822Headers;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.fge.lambdas.runnable.ThrowingRunnable;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
 
 public class MailDispatcher {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(MailDispatcher.class);
-    public static final String[] NO_HEADERS = {};
+    private static final String[] NO_HEADERS = {};
+    private static final int RETRIES = 3;
+    private static final Duration FIRST_BACKOFF = Duration.ofMillis(200);
+    private static final Duration MAX_BACKOFF = Duration.ofSeconds(1);
 
     public static Builder builder() {
         return new Builder();
     }
 
     public static class Builder {
-        public static final boolean CONSUME = true;
+        static final boolean CONSUME = true;
         private MailStore mailStore;
         private Optional<Boolean> consume = Optional.empty();
         private MailetContext mailetContext;
@@ -80,11 +88,13 @@ public class MailDispatcher {
     private final MailStore mailStore;
     private final boolean consume;
     private final MailetContext mailetContext;
+    private final Scheduler scheduler;
 
     private MailDispatcher(MailStore mailStore, boolean consume, MailetContext 
mailetContext) {
         this.mailStore = mailStore;
         this.consume = consume;
         this.mailetContext = mailetContext;
+        this.scheduler = Schedulers.boundedElastic();
     }
 
     public void dispatch(Mail mail) throws MessagingException {
@@ -130,8 +140,8 @@ public class MailDispatcher {
                 Map<String, List<String>> savedHeaders = saveHeaders(mail, 
recipient);
 
                 addSpecificHeadersForRecipient(mail, message, recipient);
-                mailStore.storeMail(recipient, mail);
-                
+                storeMailWithRetry(mail, recipient).block();
+
                 restoreHeaders(mail.getMessage(), savedHeaders);
             } catch (Exception ex) {
                 LOGGER.error("Error while storing mail.", ex);
@@ -141,6 +151,14 @@ public class MailDispatcher {
         return errors;
     }
 
+    private Mono<Void> storeMailWithRetry(Mail mail, MailAddress recipient) {
+       return Mono.fromRunnable((ThrowingRunnable)() -> 
mailStore.storeMail(recipient, mail))
+           .doOnError(error -> LOGGER.error("Error While storing mail.", 
error))
+           .subscribeOn(scheduler)
+           .retryBackoff(RETRIES, FIRST_BACKOFF, MAX_BACKOFF, scheduler)
+           .then();
+    }
+
     private Map<String, List<String>> saveHeaders(Mail mail, MailAddress 
recipient) throws MessagingException {
         ImmutableMap.Builder<String, List<String>> backup = 
ImmutableMap.builder();
         Collection<String> headersToSave = 
mail.getPerRecipientSpecificHeaders().getHeaderNamesForRecipient(recipient);


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to