This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 34d0b9c3b5 [ENHANCEMENT] Do not retry storing mails when
OverQuotaException (#2582)
34d0b9c3b5 is described below
commit 34d0b9c3b5ab1d4a1d282a4e3a507928b6c41e9e
Author: Trần Hồng Quân <[email protected]>
AuthorDate: Mon Jan 6 11:57:34 2025 +0700
[ENHANCEMENT] Do not retry storing mails when OverQuotaException (#2582)
---
.../james/transport/mailets/LocalDelivery.java | 1 +
.../transport/mailets/delivery/MailDispatcher.java | 36 +++++++++++++++++++---
.../mailets/delivery/MailboxAppenderImpl.java | 3 +-
.../mailets/delivery/MailDispatcherTest.java | 20 ++++++++++++
4 files changed, 54 insertions(+), 6 deletions(-)
diff --git
a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/LocalDelivery.java
b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/LocalDelivery.java
index a069db8ecb..e5f9c033d6 100644
---
a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/LocalDelivery.java
+++
b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/LocalDelivery.java
@@ -83,6 +83,7 @@ public class LocalDelivery extends GenericMailet {
.onMailetException(getInitParameter("onMailetException",
Mail.ERROR))
.retries(MailetUtil.getInitParameterAsInteger(getInitParameter("retries"),
Optional.of(MailDispatcher.RETRIES)))
.mailetContext(getMailetContext())
+ .usersRepository(usersRepository)
.build();
}
diff --git
a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java
b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java
index d115db5641..e393ec9d56 100644
---
a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java
+++
b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailDispatcher.java
@@ -29,8 +29,12 @@ import jakarta.mail.MessagingException;
import jakarta.mail.internet.MimeMessage;
import org.apache.james.core.MailAddress;
+import org.apache.james.core.Username;
import org.apache.james.lifecycle.api.LifecycleUtil;
+import org.apache.james.mailbox.exception.OverQuotaException;
import org.apache.james.server.core.MailImpl;
+import org.apache.james.user.api.UsersRepository;
+import org.apache.james.user.api.UsersRepositoryException;
import org.apache.james.util.AuditTrail;
import org.apache.mailet.Mail;
import org.apache.mailet.MailetContext;
@@ -64,6 +68,7 @@ public class MailDispatcher {
static final boolean DEFAULT_CONSUME = true;
static final String DEFAULT_ERROR_PROCESSOR = Mail.ERROR;
private MailStore mailStore;
+ private UsersRepository usersRepository;
private Boolean consume;
private MailetContext mailetContext;
private String onMailetException;
@@ -84,6 +89,11 @@ public class MailDispatcher {
return this;
}
+ public Builder usersRepository(UsersRepository usersRepository) {
+ this.usersRepository = usersRepository;
+ return this;
+ }
+
public Builder onMailetException(String onMailetException) {
this.onMailetException = onMailetException;
return this;
@@ -99,9 +109,11 @@ public class MailDispatcher {
public MailDispatcher build() {
Preconditions.checkNotNull(mailStore);
Preconditions.checkNotNull(mailetContext);
+ Preconditions.checkNotNull(usersRepository);
+
return new MailDispatcher(mailStore, mailetContext,
Optional.ofNullable(consume).orElse(DEFAULT_CONSUME),
- retries,
Optional.ofNullable(onMailetException).orElse(DEFAULT_ERROR_PROCESSOR));
+ retries,
Optional.ofNullable(onMailetException).orElse(DEFAULT_ERROR_PROCESSOR),
usersRepository);
}
}
@@ -112,8 +124,9 @@ public class MailDispatcher {
private final boolean propagate;
private final Optional<Integer> retries;
private final String errorProcessor;
+ private final UsersRepository usersRepository;
- private MailDispatcher(MailStore mailStore, MailetContext mailetContext,
boolean consume, Optional<Integer> retries, String onMailetException) {
+ private MailDispatcher(MailStore mailStore, MailetContext mailetContext,
boolean consume, Optional<Integer> retries, String onMailetException,
UsersRepository usersRepository) {
this.mailStore = mailStore;
this.consume = consume;
this.mailetContext = mailetContext;
@@ -121,6 +134,7 @@ public class MailDispatcher {
this.errorProcessor = onMailetException;
this.ignoreError = onMailetException.equalsIgnoreCase("ignore");
this.propagate = onMailetException.equalsIgnoreCase("propagate");
+ this.usersRepository = usersRepository;
}
public void dispatch(Mail mail) throws MessagingException {
@@ -195,18 +209,32 @@ public class MailDispatcher {
}
private Mono<Void> storeMailWithRetry(Mail mail, MailAddress recipient) {
+ Username username = computeUsername(recipient);
AtomicInteger remainRetries = new AtomicInteger(retries.orElse(0));
Mono<Void> operation = Mono.from(mailStore.storeMail(recipient, mail))
- .doOnError(error -> LOGGER.warn("Error While storing mail. This
error will be retried for {} more times.", remainRetries.getAndDecrement(),
error));
+ .doOnError(OverQuotaException.class, e -> LOGGER.info("Could not
store mail due to quota error for user {}", username.asString()))
+ .doOnError(e -> !(e instanceof OverQuotaException), error ->
LOGGER.warn("Error While storing mail. This error will be retried for {} more
times.", remainRetries.getAndDecrement(), error));
return retries.map(count ->
operation
- .retryWhen(Retry.backoff(count,
FIRST_BACKOFF).maxBackoff(MAX_BACKOFF).scheduler(Schedulers.parallel()))
+ .retryWhen(Retry.backoff(count, FIRST_BACKOFF)
+ .maxBackoff(MAX_BACKOFF)
+ .filter(e -> !(e instanceof OverQuotaException))
+ .scheduler(Schedulers.parallel()))
.then())
.orElse(operation);
}
+ private Username computeUsername(MailAddress recipient) {
+ try {
+ return usersRepository.getUsername(recipient);
+ } catch (UsersRepositoryException e) {
+ LOGGER.warn("Unable to retrieve username for {}",
recipient.asPrettyString(), e);
+ return Username.of(recipient.asString());
+ }
+ }
+
private Map<String, List<String>> saveHeaders(Mail mail, MailAddress
recipient) throws MessagingException {
ImmutableMap.Builder<String, List<String>> backup =
ImmutableMap.builder();
Collection<String> headersToSave =
mail.getPerRecipientSpecificHeaders().getHeaderNamesForRecipient(recipient);
diff --git
a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailboxAppenderImpl.java
b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailboxAppenderImpl.java
index 3cc8b974c4..0f5859b349 100644
---
a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailboxAppenderImpl.java
+++
b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/delivery/MailboxAppenderImpl.java
@@ -124,8 +124,7 @@ public class MailboxAppenderImpl implements MailboxAppender
{
},
session -> appendMessageToMailbox(mail, session, mailboxPath,
flags),
this::closeProcessing)
- .onErrorMap(OverQuotaException.class, e -> new
MessagingException("Could not append due to quota error", e))
- .onErrorMap(MailboxException.class, e -> new
MessagingException("Unable to access mailbox.", e));
+ .onErrorMap(e -> e instanceof MailboxException && !(e instanceof
OverQuotaException), e -> new MessagingException("Unable to access mailbox.",
(MailboxException) e));
}
protected Mono<AppendResult> appendMessageToMailbox(MimeMessage mail,
MailboxSession session, MailboxPath path, Optional<Flags> flags) {
diff --git
a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/delivery/MailDispatcherTest.java
b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/delivery/MailDispatcherTest.java
index d1c76870a8..8b86d5f6c8 100644
---
a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/delivery/MailDispatcherTest.java
+++
b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/delivery/MailDispatcherTest.java
@@ -38,6 +38,8 @@ import jakarta.mail.MessagingException;
import org.apache.commons.io.IOUtils;
import org.apache.james.core.MailAddress;
import org.apache.james.core.builder.MimeMessageBuilder;
+import org.apache.james.user.api.UsersRepository;
+import org.apache.james.user.memory.MemoryUsersRepository;
import org.apache.james.util.MimeMessageUtil;
import org.apache.mailet.Mail;
import org.apache.mailet.PerRecipientHeaders.Header;
@@ -64,12 +66,14 @@ class MailDispatcherTest {
private FakeMailContext fakeMailContext;
private MailStore mailStore;
+ private UsersRepository usersRepository;
@BeforeEach
public void setUp() throws Exception {
fakeMailContext = FakeMailContext.defaultContext();
mailStore = mock(MailStore.class);
when(mailStore.storeMail(any(), any())).thenReturn(Mono.empty());
+ usersRepository = MemoryUsersRepository.withVirtualHosting(null);
}
@Test
@@ -78,6 +82,7 @@ class MailDispatcherTest {
.mailetContext(fakeMailContext)
.mailStore(mailStore)
.consume(true)
+ .usersRepository(usersRepository)
.build();
FakeMail mail = FakeMail.builder()
@@ -101,6 +106,7 @@ class MailDispatcherTest {
.retries(3)
.mailStore(mailStore)
.consume(true)
+ .usersRepository(usersRepository)
.build();
AtomicInteger counter = new AtomicInteger(0);
@@ -132,6 +138,7 @@ class MailDispatcherTest {
.retries(0)
.mailStore(mailStore)
.consume(true)
+ .usersRepository(usersRepository)
.build();
AtomicInteger counter = new AtomicInteger(0);
@@ -162,6 +169,7 @@ class MailDispatcherTest {
.mailetContext(fakeMailContext)
.mailStore(mailStore)
.consume(true)
+ .usersRepository(usersRepository)
.build();
FakeMail mail = FakeMail.builder()
@@ -181,6 +189,7 @@ class MailDispatcherTest {
.mailetContext(fakeMailContext)
.mailStore(mailStore)
.consume(false)
+ .usersRepository(usersRepository)
.build();
String state = "state";
@@ -201,6 +210,7 @@ class MailDispatcherTest {
.mailetContext(fakeMailContext)
.mailStore(mailStore)
.consume(true)
+ .usersRepository(usersRepository)
.build();
doReturn(Mono.error(new MessagingException()))
.when(mailStore)
@@ -237,6 +247,7 @@ class MailDispatcherTest {
.mailetContext(fakeMailContext)
.mailStore(mailStore)
.consume(false)
+ .usersRepository(usersRepository)
.build();
FakeMail mail = FakeMail.builder()
@@ -262,6 +273,7 @@ class MailDispatcherTest {
.mailetContext(fakeMailContext)
.mailStore(accumulatorTestHeaderMailStore)
.consume(false)
+ .usersRepository(usersRepository)
.build();
FakeMail mail = FakeMail.builder()
@@ -285,6 +297,7 @@ class MailDispatcherTest {
.mailetContext(fakeMailContext)
.mailStore(accumulatorTestHeaderMailStore)
.consume(false)
+ .usersRepository(usersRepository)
.build();
FakeMail mail = FakeMail.builder()
@@ -308,6 +321,7 @@ class MailDispatcherTest {
.mailetContext(fakeMailContext)
.mailStore(accumulatorTestHeaderMailStore)
.consume(false)
+ .usersRepository(usersRepository)
.build();
FakeMail mail = FakeMail.builder()
@@ -333,6 +347,7 @@ class MailDispatcherTest {
.mailetContext(fakeMailContext)
.mailStore(accumulatorTestHeaderMailStore)
.consume(false)
+ .usersRepository(usersRepository)
.build();
FakeMail mail = FakeMail.builder()
@@ -359,6 +374,7 @@ class MailDispatcherTest {
.mailetContext(fakeMailContext)
.mailStore(accumulatorTestHeaderMailStore)
.consume(false)
+ .usersRepository(usersRepository)
.build();
FakeMail mail = FakeMail.builder()
@@ -382,6 +398,7 @@ class MailDispatcherTest {
.mailetContext(fakeMailContext)
.mailStore(accumulatorTestHeaderMailStore)
.consume(false)
+ .usersRepository(usersRepository)
.build();
String headerValue = "arbitraryValue";
@@ -406,6 +423,7 @@ class MailDispatcherTest {
.mailetContext(fakeMailContext)
.mailStore(mailStore)
.onMailetException("ignore")
+ .usersRepository(usersRepository)
.build();
doReturn(Mono.error(new MessagingException()))
@@ -434,6 +452,7 @@ class MailDispatcherTest {
.mailetContext(fakeMailContext)
.mailStore(mailStore)
.onMailetException("errorProcessor1")
+ .usersRepository(usersRepository)
.build();
doReturn(Mono.error(new MessagingException()))
@@ -463,6 +482,7 @@ class MailDispatcherTest {
.mailetContext(fakeMailContext)
.mailStore(mailStore)
.onMailetException("propagate")
+ .usersRepository(usersRepository)
.build();
doReturn(Mono.error(new MessagingException()))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]