This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 61d975e228 [FIX] Ability to split emails with large recipient counts
(#2526)
61d975e228 is described below
commit 61d975e228adb360b29a9b10be91e54e6f1dbbb7
Author: Benoit TELLIER <[email protected]>
AuthorDate: Fri Nov 29 08:44:17 2024 +0100
[FIX] Ability to split emails with large recipient counts (#2526)
---
docs/modules/servers/partials/MailToAllUsers.adoc | 12 +--
docs/modules/servers/partials/SplitMail.adoc | 16 ++++
.../servers/partials/configure/mailets.adoc | 4 +
.../james/transport/mailets/MailToAllUsers.java | 62 +--------------
.../{MailToAllUsers.java => SplitMail.java} | 33 +++-----
.../transport/mailets/MailToAllUsersTest.java | 37 +--------
...{MailToAllUsersTest.java => SplitMailTest.java} | 87 +++++++++++++---------
7 files changed, 94 insertions(+), 157 deletions(-)
diff --git a/docs/modules/servers/partials/MailToAllUsers.adoc
b/docs/modules/servers/partials/MailToAllUsers.adoc
index caf5516d3f..93f7834ad7 100644
--- a/docs/modules/servers/partials/MailToAllUsers.adoc
+++ b/docs/modules/servers/partials/MailToAllUsers.adoc
@@ -1,12 +1,6 @@
=== MailToAllUsers
-A mailet that helps to email to all users in the system. The emails are sent
in batches to manage
-the load. The first batch is sent directly, while the remaining batches are
sent asynchronously.
-
-==== Configuration parameters
-
-*batchSize*::
-The number of recipients to include in each batch. Optional, default to 100.
+A mailet that helps to email to all users in the system.
==== Sample configuration
@@ -16,9 +10,7 @@ The number of recipients to include in each batch. Optional,
default to 100.
<matcher match="[email protected]"/>
<matcher match="[email protected]"/>
</matcher>
-<mailet match="notify-matcher" class="MailToAllUsers">
- <batchSize>100</batchSize>
-</mailet>
+<mailet match="notify-matcher" class="MailToAllUsers"/>
----
diff --git a/docs/modules/servers/partials/SplitMail.adoc
b/docs/modules/servers/partials/SplitMail.adoc
new file mode 100644
index 0000000000..0ac2be5d3e
--- /dev/null
+++ b/docs/modules/servers/partials/SplitMail.adoc
@@ -0,0 +1,16 @@
+=== SplitMail
+
+A mailet to split email with too much recipients. The first batch is sent
directly, while the remaining batches are sent asynchronously.
+The batch size can be configured via the <b>batchSize</b> parameter (optional,
defaults to 100).
+
+==== Sample configuration
+
+[source,xml]
+----
+<mailet match="notify-matcher" class="SplitMail">
+ <batchSize>100</batchSize>
+</mailet>
+----
+
+
+
diff --git a/docs/modules/servers/partials/configure/mailets.adoc
b/docs/modules/servers/partials/configure/mailets.adoc
index 7a8b690a6d..adcdff20d6 100644
--- a/docs/modules/servers/partials/configure/mailets.adoc
+++ b/docs/modules/servers/partials/configure/mailets.adoc
@@ -45,6 +45,8 @@ include::partial$MailAttributesListToMimeHeaders.adoc[]
include::partial$MailAttributesToMimeHeaders.adoc[]
+include::partial$MailToAllUsers.adoc[]
+
include::partial$MetricsMailet.adoc[]
include::partial$MimeDecodingMailet.adoc[]
@@ -97,6 +99,8 @@ include::partial$SpamAssassin.adoc[]
include::partial$StripAttachment.adoc[]
+include::partial$SplitMail.adoc[]
+
include::partial$TextCalendarBodyToAttachment.adoc[]
include::partial$ToProcessor.adoc[]
diff --git
a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/MailToAllUsers.java
b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/MailToAllUsers.java
index 46a28d42c0..4d63c58cd5 100644
---
a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/MailToAllUsers.java
+++
b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/MailToAllUsers.java
@@ -19,30 +19,20 @@
package org.apache.james.transport.mailets;
-import java.util.Optional;
-import java.util.function.Function;
-
import jakarta.inject.Inject;
import jakarta.mail.MessagingException;
-import org.apache.james.core.MailAddress;
import org.apache.james.core.Username;
-import org.apache.james.lifecycle.api.LifecycleUtil;
import org.apache.james.user.api.UsersRepository;
import org.apache.mailet.Mail;
import org.apache.mailet.base.GenericMailet;
-import org.reactivestreams.Publisher;
import com.github.fge.lambdas.Throwing;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.util.function.Tuple2;
/**
- * A mailet that helps to email all users in the system. The emails are sent
in batches to manage
- * the load. The first batch is sent directly, while the remaining batches are
sent asynchronously.
- * The batch size can be configured via the <b>batchSize</b> parameter
(optional, defaults to 100).
+ * A mailet that helps to email all users in the system.
*
* <h3>Configuration</h3>
* <pre><code>
@@ -51,70 +41,26 @@ import reactor.util.function.Tuple2;
* <matcher match="[email protected]"/>
* <matcher match="[email protected]"/>
* </matcher>
- * <mailet match="notify-matcher" class="MailToAllUsers">
- * <batchSize>100</batchSize>
- * </mailet>
+ * <mailet match="notify-matcher" class="MailToAllUsers"/>
* }
* </code></pre>
*
*/
public class MailToAllUsers extends GenericMailet {
- private static final int DEFAULT_BATCH_SIZE = 100;
private final UsersRepository usersRepository;
- private int batchSize;
@Inject
public MailToAllUsers(UsersRepository usersRepository) {
this.usersRepository = usersRepository;
}
- @Override
- public void init() throws MessagingException {
- batchSize =
Integer.parseInt(Optional.ofNullable(getInitParameter("batchSize"))
- .orElse(String.valueOf(DEFAULT_BATCH_SIZE)));
- }
-
@Override
public void service(Mail mail) throws MessagingException {
Flux.from(usersRepository.listReactive())
.map(Throwing.function(Username::asMailAddress))
- .window(batchSize)
- .index()
- .flatMap(sendMail(mail))
- .then()
- .block();
- }
-
- private Function<Tuple2<Long, Flux<MailAddress>>, Publisher<Void>>
sendMail(Mail mail) {
- return tuple -> {
- boolean firstBatch = tuple.getT1() == 0;
- if (firstBatch) {
- return sendMailToFirstRecipientsBatchDirectly(mail,
tuple.getT2());
- }
- return sendMailToRemainingRecipientsBatchAsynchronously(mail,
tuple.getT2());
- };
- }
-
- private Mono<Void> sendMailToFirstRecipientsBatchDirectly(Mail mail,
Flux<MailAddress> firstRecipientsBatch) {
- return firstRecipientsBatch
- .collectList()
- .flatMap(recipients -> Mono.fromRunnable(() ->
mail.setRecipients(recipients)))
- .then();
- }
-
- private Mono<Void> sendMailToRemainingRecipientsBatchAsynchronously(Mail
mail, Flux<MailAddress> remainingRecipientsBatch) {
- return remainingRecipientsBatch
.collectList()
- .flatMap(recipients -> Mono.fromRunnable(Throwing.runnable(() -> {
- Mail duplicateMail = mail.duplicate();
- try {
- duplicateMail.setRecipients(recipients);
- getMailetContext().sendMail(duplicateMail);
- } finally {
- LifecycleUtil.dispose(duplicateMail);
- }
- })))
- .then();
+ .doOnNext(mail::setRecipients)
+ .block();
}
@Override
diff --git
a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/MailToAllUsers.java
b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/SplitMail.java
similarity index 81%
copy from
server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/MailToAllUsers.java
copy to
server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/SplitMail.java
index 46a28d42c0..b1a746e3d9 100644
---
a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/MailToAllUsers.java
+++
b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/SplitMail.java
@@ -22,13 +22,10 @@ package org.apache.james.transport.mailets;
import java.util.Optional;
import java.util.function.Function;
-import jakarta.inject.Inject;
import jakarta.mail.MessagingException;
import org.apache.james.core.MailAddress;
-import org.apache.james.core.Username;
import org.apache.james.lifecycle.api.LifecycleUtil;
-import org.apache.james.user.api.UsersRepository;
import org.apache.mailet.Mail;
import org.apache.mailet.base.GenericMailet;
import org.reactivestreams.Publisher;
@@ -40,8 +37,7 @@ import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
/**
- * A mailet that helps to email all users in the system. The emails are sent
in batches to manage
- * the load. The first batch is sent directly, while the remaining batches are
sent asynchronously.
+ * A mailet to split email with too much recipients. The first batch is sent
directly, while the remaining batches are sent asynchronously.
* The batch size can be configured via the <b>batchSize</b> parameter
(optional, defaults to 100).
*
* <h3>Configuration</h3>
@@ -51,23 +47,17 @@ import reactor.util.function.Tuple2;
* <matcher match="[email protected]"/>
* <matcher match="[email protected]"/>
* </matcher>
- * <mailet match="notify-matcher" class="MailToAllUsers">
+ * <mailet match="notify-matcher" class="SplitMail">
* <batchSize>100</batchSize>
* </mailet>
* }
* </code></pre>
*
*/
-public class MailToAllUsers extends GenericMailet {
+public class SplitMail extends GenericMailet {
private static final int DEFAULT_BATCH_SIZE = 100;
- private final UsersRepository usersRepository;
private int batchSize;
- @Inject
- public MailToAllUsers(UsersRepository usersRepository) {
- this.usersRepository = usersRepository;
- }
-
@Override
public void init() throws MessagingException {
batchSize =
Integer.parseInt(Optional.ofNullable(getInitParameter("batchSize"))
@@ -76,13 +66,14 @@ public class MailToAllUsers extends GenericMailet {
@Override
public void service(Mail mail) throws MessagingException {
- Flux.from(usersRepository.listReactive())
- .map(Throwing.function(Username::asMailAddress))
- .window(batchSize)
- .index()
- .flatMap(sendMail(mail))
- .then()
- .block();
+ if (mail.getRecipients().stream().count() > batchSize) {
+ Flux.fromIterable(mail.getRecipients())
+ .window(batchSize)
+ .index()
+ .flatMap(sendMail(mail))
+ .then()
+ .block();
+ }
}
private Function<Tuple2<Long, Flux<MailAddress>>, Publisher<Void>>
sendMail(Mail mail) {
@@ -119,6 +110,6 @@ public class MailToAllUsers extends GenericMailet {
@Override
public String getMailetName() {
- return "MailToAllUsers";
+ return "SplitMail";
}
}
diff --git
a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/MailToAllUsersTest.java
b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/MailToAllUsersTest.java
index be7dd91657..dc97da6a5c 100644
---
a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/MailToAllUsersTest.java
+++
b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/MailToAllUsersTest.java
@@ -21,7 +21,6 @@ package org.apache.james.transport.mailets;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import jakarta.mail.MessagingException;
@@ -30,13 +29,9 @@ import org.apache.james.core.Username;
import org.apache.james.core.builder.MimeMessageBuilder;
import org.apache.james.user.api.UsersRepository;
import org.apache.mailet.Mail;
-import org.apache.mailet.MailetContext;
import org.apache.mailet.base.test.FakeMail;
-import org.apache.mailet.base.test.FakeMailetConfig;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
import reactor.core.publisher.Flux;
@@ -47,51 +42,27 @@ class MailToAllUsersTest {
private static final Username USER_4 = Username.of("[email protected]");
private UsersRepository usersRepository;
- private MailetContext mailetContext;
private MailToAllUsers testee;
@BeforeEach
- void setUp() {
+ void setUp() throws Exception {
usersRepository = mock(UsersRepository.class);
- mailetContext = Mockito.mock(MailetContext.class);
testee = new MailToAllUsers(usersRepository);
}
@Test
- void firstUsersBatchShouldBeSentDirectly() throws Exception {
+ void shouldSendAMailToAllUsers() throws Exception {
when(usersRepository.listReactive())
.thenReturn(Flux.just(USER_1, USER_2, USER_3, USER_4));
- testee.init(FakeMailetConfig.builder()
- .mailetContext(mailetContext)
- .setProperty("batchSize", "2")
- .build());
-
Mail originalMail = createMail();
testee.service(originalMail);
assertThat(originalMail.getRecipients())
- .containsExactlyInAnyOrder(USER_1.asMailAddress(),
USER_2.asMailAddress());
+ .containsExactlyInAnyOrder(USER_1.asMailAddress(),
USER_2.asMailAddress(),
+ USER_3.asMailAddress(), USER_4.asMailAddress());
}
- @Test
- void remainingUsersBatchesShouldBeSentAsync() throws Exception {
- when(usersRepository.listReactive())
- .thenReturn(Flux.just(USER_1, USER_2, USER_3, USER_4));
-
- testee.init(FakeMailetConfig.builder()
- .mailetContext(mailetContext)
- .setProperty("batchSize", "2")
- .build());
-
- Mail originalMail = createMail();
- testee.service(originalMail);
-
- ArgumentCaptor<Mail> mailCaptor = ArgumentCaptor.forClass(Mail.class);
- verify(mailetContext).sendMail(mailCaptor.capture());
- assertThat(mailCaptor.getValue().getRecipients())
- .containsExactly(USER_3.asMailAddress(), USER_4.asMailAddress());
- }
private Mail createMail() throws MessagingException {
return FakeMail.builder()
diff --git
a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/MailToAllUsersTest.java
b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/SplitMailTest.java
similarity index 59%
copy from
server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/MailToAllUsersTest.java
copy to
server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/SplitMailTest.java
index be7dd91657..2dbac525a2 100644
---
a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/MailToAllUsersTest.java
+++
b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/SplitMailTest.java
@@ -20,55 +20,61 @@
package org.apache.james.transport.mailets;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
import jakarta.mail.MessagingException;
import org.apache.james.core.Username;
import org.apache.james.core.builder.MimeMessageBuilder;
-import org.apache.james.user.api.UsersRepository;
import org.apache.mailet.Mail;
-import org.apache.mailet.MailetContext;
import org.apache.mailet.base.test.FakeMail;
+import org.apache.mailet.base.test.FakeMailContext;
import org.apache.mailet.base.test.FakeMailetConfig;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
-import reactor.core.publisher.Flux;
+import com.google.common.collect.ImmutableList;
-class MailToAllUsersTest {
+class SplitMailTest {
private static final Username USER_1 = Username.of("[email protected]");
private static final Username USER_2 = Username.of("[email protected]");
private static final Username USER_3 = Username.of("[email protected]");
private static final Username USER_4 = Username.of("[email protected]");
+ private static final Username USER_5 = Username.of("[email protected]");
- private UsersRepository usersRepository;
- private MailetContext mailetContext;
- private MailToAllUsers testee;
+ private FakeMailContext mailetContext;
+ private SplitMail splitMail;
@BeforeEach
- void setUp() {
- usersRepository = mock(UsersRepository.class);
- mailetContext = Mockito.mock(MailetContext.class);
- testee = new MailToAllUsers(usersRepository);
+ void setUp() throws Exception {
+ mailetContext = FakeMailContext.defaultContext();
+ splitMail = new SplitMail();
+ splitMail.init(FakeMailetConfig.builder()
+ .mailetContext(mailetContext)
+ .setProperty("batchSize", "2")
+ .build());
}
@Test
- void firstUsersBatchShouldBeSentDirectly() throws Exception {
- when(usersRepository.listReactive())
- .thenReturn(Flux.just(USER_1, USER_2, USER_3, USER_4));
+ void firstUsersBatchShouldBeSentDirectlyWhenExactlyBatchSize() throws
Exception {
+ Mail originalMail = createMail();
+ originalMail.setRecipients(ImmutableList.of(USER_1.asMailAddress(),
+ USER_2.asMailAddress()));
- testee.init(FakeMailetConfig.builder()
- .mailetContext(mailetContext)
- .setProperty("batchSize", "2")
- .build());
+ splitMail.service(originalMail);
+
+ assertThat(originalMail.getRecipients())
+ .containsExactlyInAnyOrder(USER_1.asMailAddress(),
USER_2.asMailAddress());
+ }
+ @Test
+ void firstUsersBatchShouldBeSentDirectly() throws Exception {
Mail originalMail = createMail();
- testee.service(originalMail);
+ originalMail.setRecipients(ImmutableList.of(USER_1.asMailAddress(),
+ USER_2.asMailAddress(),
+ USER_3.asMailAddress(),
+ USER_4.asMailAddress()));
+
+ splitMail.service(originalMail);
assertThat(originalMail.getRecipients())
.containsExactlyInAnyOrder(USER_1.asMailAddress(),
USER_2.asMailAddress());
@@ -76,21 +82,32 @@ class MailToAllUsersTest {
@Test
void remainingUsersBatchesShouldBeSentAsync() throws Exception {
- when(usersRepository.listReactive())
- .thenReturn(Flux.just(USER_1, USER_2, USER_3, USER_4));
+ Mail originalMail = createMail();
+ originalMail.setRecipients(ImmutableList.of(USER_1.asMailAddress(),
+ USER_2.asMailAddress(),
+ USER_3.asMailAddress(),
+ USER_4.asMailAddress()));
- testee.init(FakeMailetConfig.builder()
- .mailetContext(mailetContext)
- .setProperty("batchSize", "2")
- .build());
+ splitMail.service(originalMail);
+ assertThat(mailetContext.getSentMails().getFirst().getRecipients())
+ .containsExactly(USER_3.asMailAddress(), USER_4.asMailAddress());
+ }
+
+ @Test
+ void remainingUsersBatchesShouldBeSentAsyncInSeveralBatches() throws
Exception {
Mail originalMail = createMail();
- testee.service(originalMail);
+ originalMail.setRecipients(ImmutableList.of(USER_1.asMailAddress(),
+ USER_2.asMailAddress(),
+ USER_3.asMailAddress(),
+ USER_4.asMailAddress(),
+ USER_5.asMailAddress()));
- ArgumentCaptor<Mail> mailCaptor = ArgumentCaptor.forClass(Mail.class);
- verify(mailetContext).sendMail(mailCaptor.capture());
- assertThat(mailCaptor.getValue().getRecipients())
- .containsExactly(USER_3.asMailAddress(), USER_4.asMailAddress());
+ splitMail.service(originalMail);
+
+ assertThat(mailetContext.getSentMails())
+ .hasSize(2)
+ .allMatch(mail -> mail.getRecipients().size() <= 2);
}
private Mail createMail() throws MessagingException {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]