This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit fb23ed7c703a6b8afdb0bcb51c5b6446815ba2fc Author: Rémi KOWALSKI <rkowal...@linagora.com> AuthorDate: Thu Nov 21 14:07:25 2019 +0100 JAMES-2979 retry storing mail with a boundedElastic scheduler in Maildispatcher --- .../transport/mailets/delivery/MailDispatcher.java | 26 ++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java index 339323f..4737012 100644 --- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java +++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java @@ -18,6 +18,7 @@ ****************************************************************/ package org.apache.james.transport.mailets.delivery; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -36,20 +37,27 @@ import org.apache.mailet.base.RFC2822Headers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.github.fge.lambdas.runnable.ThrowingRunnable; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; public class MailDispatcher { private static final Logger LOGGER = LoggerFactory.getLogger(MailDispatcher.class); - public static final String[] NO_HEADERS = {}; + private static final String[] NO_HEADERS = {}; + private static final int RETRIES = 3; + private static final Duration FIRST_BACKOFF = Duration.ofMillis(200); + private static final Duration MAX_BACKOFF = Duration.ofSeconds(1); public static Builder builder() { return new Builder(); } public static class Builder { - public static final boolean CONSUME = true; + static final boolean CONSUME = true; private MailStore mailStore; private Optional<Boolean> consume = Optional.empty(); private MailetContext mailetContext; @@ -80,11 +88,13 @@ public class MailDispatcher { private final MailStore mailStore; private final boolean consume; private final MailetContext mailetContext; + private final Scheduler scheduler; private MailDispatcher(MailStore mailStore, boolean consume, MailetContext mailetContext) { this.mailStore = mailStore; this.consume = consume; this.mailetContext = mailetContext; + this.scheduler = Schedulers.boundedElastic(); } public void dispatch(Mail mail) throws MessagingException { @@ -130,8 +140,8 @@ public class MailDispatcher { Map<String, List<String>> savedHeaders = saveHeaders(mail, recipient); addSpecificHeadersForRecipient(mail, message, recipient); - mailStore.storeMail(recipient, mail); - + storeMailWithRetry(mail, recipient).block(); + restoreHeaders(mail.getMessage(), savedHeaders); } catch (Exception ex) { LOGGER.error("Error while storing mail.", ex); @@ -141,6 +151,14 @@ public class MailDispatcher { return errors; } + private Mono<Void> storeMailWithRetry(Mail mail, MailAddress recipient) { + return Mono.fromRunnable((ThrowingRunnable)() -> mailStore.storeMail(recipient, mail)) + .doOnError(error -> LOGGER.error("Error While storing mail.", error)) + .subscribeOn(scheduler) + .retryBackoff(RETRIES, FIRST_BACKOFF, MAX_BACKOFF, scheduler) + .then(); + } + private Map<String, List<String>> saveHeaders(Mail mail, MailAddress recipient) throws MessagingException { ImmutableMap.Builder<String, List<String>> backup = ImmutableMap.builder(); Collection<String> headersToSave = mail.getPerRecipientSpecificHeaders().getHeaderNamesForRecipient(recipient); --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org