This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 61d975e228 [FIX] Ability to split emails with large recipient counts 
(#2526)
61d975e228 is described below

commit 61d975e228adb360b29a9b10be91e54e6f1dbbb7
Author: Benoit TELLIER <[email protected]>
AuthorDate: Fri Nov 29 08:44:17 2024 +0100

    [FIX] Ability to split emails with large recipient counts (#2526)
---
 docs/modules/servers/partials/MailToAllUsers.adoc  | 12 +--
 docs/modules/servers/partials/SplitMail.adoc       | 16 ++++
 .../servers/partials/configure/mailets.adoc        |  4 +
 .../james/transport/mailets/MailToAllUsers.java    | 62 +--------------
 .../{MailToAllUsers.java => SplitMail.java}        | 33 +++-----
 .../transport/mailets/MailToAllUsersTest.java      | 37 +--------
 ...{MailToAllUsersTest.java => SplitMailTest.java} | 87 +++++++++++++---------
 7 files changed, 94 insertions(+), 157 deletions(-)

diff --git a/docs/modules/servers/partials/MailToAllUsers.adoc 
b/docs/modules/servers/partials/MailToAllUsers.adoc
index caf5516d3f..93f7834ad7 100644
--- a/docs/modules/servers/partials/MailToAllUsers.adoc
+++ b/docs/modules/servers/partials/MailToAllUsers.adoc
@@ -1,12 +1,6 @@
 === MailToAllUsers
 
-A mailet that helps to email to all users in the system. The emails are sent 
in batches to manage
-the load. The first batch is sent directly, while the remaining batches are 
sent asynchronously.
-
-==== Configuration parameters
-
-*batchSize*::
-The number of recipients to include in each batch. Optional, default to 100.
+A mailet that helps to email to all users in the system.
 
 ==== Sample configuration
 
@@ -16,9 +10,7 @@ The number of recipients to include in each batch. Optional, 
default to 100.
     <matcher match="[email protected]"/>
     <matcher match="[email protected]"/>
 </matcher>
-<mailet match="notify-matcher" class="MailToAllUsers">
-    <batchSize>100</batchSize>
-</mailet>
+<mailet match="notify-matcher" class="MailToAllUsers"/>
 ----
 
 
diff --git a/docs/modules/servers/partials/SplitMail.adoc 
b/docs/modules/servers/partials/SplitMail.adoc
new file mode 100644
index 0000000000..0ac2be5d3e
--- /dev/null
+++ b/docs/modules/servers/partials/SplitMail.adoc
@@ -0,0 +1,16 @@
+=== SplitMail
+
+A mailet to split email with too much recipients. The first batch is sent 
directly, while the remaining batches are sent asynchronously.
+The batch size can be configured via the <b>batchSize</b> parameter (optional, 
defaults to 100).
+
+==== Sample configuration
+
+[source,xml]
+----
+<mailet match="notify-matcher" class="SplitMail">
+  <batchSize>100</batchSize>
+</mailet>
+----
+
+
+
diff --git a/docs/modules/servers/partials/configure/mailets.adoc 
b/docs/modules/servers/partials/configure/mailets.adoc
index 7a8b690a6d..adcdff20d6 100644
--- a/docs/modules/servers/partials/configure/mailets.adoc
+++ b/docs/modules/servers/partials/configure/mailets.adoc
@@ -45,6 +45,8 @@ include::partial$MailAttributesListToMimeHeaders.adoc[]
 
 include::partial$MailAttributesToMimeHeaders.adoc[]
 
+include::partial$MailToAllUsers.adoc[]
+
 include::partial$MetricsMailet.adoc[]
 
 include::partial$MimeDecodingMailet.adoc[]
@@ -97,6 +99,8 @@ include::partial$SpamAssassin.adoc[]
 
 include::partial$StripAttachment.adoc[]
 
+include::partial$SplitMail.adoc[]
+
 include::partial$TextCalendarBodyToAttachment.adoc[]
 
 include::partial$ToProcessor.adoc[]
diff --git 
a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/MailToAllUsers.java
 
b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/MailToAllUsers.java
index 46a28d42c0..4d63c58cd5 100644
--- 
a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/MailToAllUsers.java
+++ 
b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/MailToAllUsers.java
@@ -19,30 +19,20 @@
 
 package org.apache.james.transport.mailets;
 
-import java.util.Optional;
-import java.util.function.Function;
-
 import jakarta.inject.Inject;
 import jakarta.mail.MessagingException;
 
-import org.apache.james.core.MailAddress;
 import org.apache.james.core.Username;
-import org.apache.james.lifecycle.api.LifecycleUtil;
 import org.apache.james.user.api.UsersRepository;
 import org.apache.mailet.Mail;
 import org.apache.mailet.base.GenericMailet;
-import org.reactivestreams.Publisher;
 
 import com.github.fge.lambdas.Throwing;
 
 import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.util.function.Tuple2;
 
 /**
- * A mailet that helps to email all users in the system. The emails are sent 
in batches to manage
- * the load. The first batch is sent directly, while the remaining batches are 
sent asynchronously.
- * The batch size can be configured via the <b>batchSize</b> parameter 
(optional, defaults to 100).
+ * A mailet that helps to email all users in the system.
  *
  * <h3>Configuration</h3>
  * <pre><code>
@@ -51,70 +41,26 @@ import reactor.util.function.Tuple2;
  *     <matcher match="[email protected]"/>
  *     <matcher match="[email protected]"/>
  * </matcher>
- * <mailet match="notify-matcher" class="MailToAllUsers">
- *     <batchSize>100</batchSize>
- * </mailet>
+ * <mailet match="notify-matcher" class="MailToAllUsers"/>
  * }
  * </code></pre>
  *
  */
 public class MailToAllUsers extends GenericMailet {
-    private static final int DEFAULT_BATCH_SIZE = 100;
     private final UsersRepository usersRepository;
-    private int batchSize;
 
     @Inject
     public MailToAllUsers(UsersRepository usersRepository) {
         this.usersRepository = usersRepository;
     }
 
-    @Override
-    public void init() throws MessagingException {
-        batchSize = 
Integer.parseInt(Optional.ofNullable(getInitParameter("batchSize"))
-            .orElse(String.valueOf(DEFAULT_BATCH_SIZE)));
-    }
-
     @Override
     public void service(Mail mail) throws MessagingException {
         Flux.from(usersRepository.listReactive())
             .map(Throwing.function(Username::asMailAddress))
-            .window(batchSize)
-            .index()
-            .flatMap(sendMail(mail))
-            .then()
-            .block();
-    }
-
-    private Function<Tuple2<Long, Flux<MailAddress>>, Publisher<Void>> 
sendMail(Mail mail) {
-        return tuple -> {
-            boolean firstBatch = tuple.getT1() == 0;
-            if (firstBatch) {
-                return sendMailToFirstRecipientsBatchDirectly(mail, 
tuple.getT2());
-            }
-            return sendMailToRemainingRecipientsBatchAsynchronously(mail, 
tuple.getT2());
-        };
-    }
-
-    private Mono<Void> sendMailToFirstRecipientsBatchDirectly(Mail mail, 
Flux<MailAddress> firstRecipientsBatch) {
-        return firstRecipientsBatch
-            .collectList()
-            .flatMap(recipients -> Mono.fromRunnable(() -> 
mail.setRecipients(recipients)))
-            .then();
-    }
-
-    private Mono<Void> sendMailToRemainingRecipientsBatchAsynchronously(Mail 
mail, Flux<MailAddress> remainingRecipientsBatch) {
-        return remainingRecipientsBatch
             .collectList()
-            .flatMap(recipients -> Mono.fromRunnable(Throwing.runnable(() -> {
-                Mail duplicateMail = mail.duplicate();
-                try {
-                    duplicateMail.setRecipients(recipients);
-                    getMailetContext().sendMail(duplicateMail);
-                } finally {
-                    LifecycleUtil.dispose(duplicateMail);
-                }
-            })))
-            .then();
+            .doOnNext(mail::setRecipients)
+            .block();
     }
 
     @Override
diff --git 
a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/MailToAllUsers.java
 
b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/SplitMail.java
similarity index 81%
copy from 
server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/MailToAllUsers.java
copy to 
server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/SplitMail.java
index 46a28d42c0..b1a746e3d9 100644
--- 
a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/MailToAllUsers.java
+++ 
b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/SplitMail.java
@@ -22,13 +22,10 @@ package org.apache.james.transport.mailets;
 import java.util.Optional;
 import java.util.function.Function;
 
-import jakarta.inject.Inject;
 import jakarta.mail.MessagingException;
 
 import org.apache.james.core.MailAddress;
-import org.apache.james.core.Username;
 import org.apache.james.lifecycle.api.LifecycleUtil;
-import org.apache.james.user.api.UsersRepository;
 import org.apache.mailet.Mail;
 import org.apache.mailet.base.GenericMailet;
 import org.reactivestreams.Publisher;
@@ -40,8 +37,7 @@ import reactor.core.publisher.Mono;
 import reactor.util.function.Tuple2;
 
 /**
- * A mailet that helps to email all users in the system. The emails are sent 
in batches to manage
- * the load. The first batch is sent directly, while the remaining batches are 
sent asynchronously.
+ * A mailet to split email with too much recipients. The first batch is sent 
directly, while the remaining batches are sent asynchronously.
  * The batch size can be configured via the <b>batchSize</b> parameter 
(optional, defaults to 100).
  *
  * <h3>Configuration</h3>
@@ -51,23 +47,17 @@ import reactor.util.function.Tuple2;
  *     <matcher match="[email protected]"/>
  *     <matcher match="[email protected]"/>
  * </matcher>
- * <mailet match="notify-matcher" class="MailToAllUsers">
+ * <mailet match="notify-matcher" class="SplitMail">
  *     <batchSize>100</batchSize>
  * </mailet>
  * }
  * </code></pre>
  *
  */
-public class MailToAllUsers extends GenericMailet {
+public class SplitMail extends GenericMailet {
     private static final int DEFAULT_BATCH_SIZE = 100;
-    private final UsersRepository usersRepository;
     private int batchSize;
 
-    @Inject
-    public MailToAllUsers(UsersRepository usersRepository) {
-        this.usersRepository = usersRepository;
-    }
-
     @Override
     public void init() throws MessagingException {
         batchSize = 
Integer.parseInt(Optional.ofNullable(getInitParameter("batchSize"))
@@ -76,13 +66,14 @@ public class MailToAllUsers extends GenericMailet {
 
     @Override
     public void service(Mail mail) throws MessagingException {
-        Flux.from(usersRepository.listReactive())
-            .map(Throwing.function(Username::asMailAddress))
-            .window(batchSize)
-            .index()
-            .flatMap(sendMail(mail))
-            .then()
-            .block();
+        if (mail.getRecipients().stream().count() > batchSize) {
+            Flux.fromIterable(mail.getRecipients())
+                .window(batchSize)
+                .index()
+                .flatMap(sendMail(mail))
+                .then()
+                .block();
+        }
     }
 
     private Function<Tuple2<Long, Flux<MailAddress>>, Publisher<Void>> 
sendMail(Mail mail) {
@@ -119,6 +110,6 @@ public class MailToAllUsers extends GenericMailet {
 
     @Override
     public String getMailetName() {
-        return "MailToAllUsers";
+        return "SplitMail";
     }
 }
diff --git 
a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/MailToAllUsersTest.java
 
b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/MailToAllUsersTest.java
index be7dd91657..dc97da6a5c 100644
--- 
a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/MailToAllUsersTest.java
+++ 
b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/MailToAllUsersTest.java
@@ -21,7 +21,6 @@ package org.apache.james.transport.mailets;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import jakarta.mail.MessagingException;
@@ -30,13 +29,9 @@ import org.apache.james.core.Username;
 import org.apache.james.core.builder.MimeMessageBuilder;
 import org.apache.james.user.api.UsersRepository;
 import org.apache.mailet.Mail;
-import org.apache.mailet.MailetContext;
 import org.apache.mailet.base.test.FakeMail;
-import org.apache.mailet.base.test.FakeMailetConfig;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
 
 import reactor.core.publisher.Flux;
 
@@ -47,51 +42,27 @@ class MailToAllUsersTest {
     private static final Username USER_4 = Username.of("[email protected]");
 
     private UsersRepository usersRepository;
-    private MailetContext mailetContext;
     private MailToAllUsers testee;
 
     @BeforeEach
-    void setUp() {
+    void setUp() throws Exception {
         usersRepository = mock(UsersRepository.class);
-        mailetContext = Mockito.mock(MailetContext.class);
         testee = new MailToAllUsers(usersRepository);
     }
 
     @Test
-    void firstUsersBatchShouldBeSentDirectly() throws Exception {
+    void shouldSendAMailToAllUsers() throws Exception {
         when(usersRepository.listReactive())
             .thenReturn(Flux.just(USER_1, USER_2, USER_3, USER_4));
 
-        testee.init(FakeMailetConfig.builder()
-            .mailetContext(mailetContext)
-            .setProperty("batchSize", "2")
-            .build());
-
         Mail originalMail = createMail();
         testee.service(originalMail);
 
         assertThat(originalMail.getRecipients())
-            .containsExactlyInAnyOrder(USER_1.asMailAddress(), 
USER_2.asMailAddress());
+            .containsExactlyInAnyOrder(USER_1.asMailAddress(), 
USER_2.asMailAddress(),
+                USER_3.asMailAddress(), USER_4.asMailAddress());
     }
 
-    @Test
-    void remainingUsersBatchesShouldBeSentAsync() throws Exception {
-        when(usersRepository.listReactive())
-            .thenReturn(Flux.just(USER_1, USER_2, USER_3, USER_4));
-
-        testee.init(FakeMailetConfig.builder()
-            .mailetContext(mailetContext)
-            .setProperty("batchSize", "2")
-            .build());
-
-        Mail originalMail = createMail();
-        testee.service(originalMail);
-
-        ArgumentCaptor<Mail> mailCaptor = ArgumentCaptor.forClass(Mail.class);
-        verify(mailetContext).sendMail(mailCaptor.capture());
-        assertThat(mailCaptor.getValue().getRecipients())
-            .containsExactly(USER_3.asMailAddress(), USER_4.asMailAddress());
-    }
 
     private Mail createMail() throws MessagingException {
         return FakeMail.builder()
diff --git 
a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/MailToAllUsersTest.java
 
b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/SplitMailTest.java
similarity index 59%
copy from 
server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/MailToAllUsersTest.java
copy to 
server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/SplitMailTest.java
index be7dd91657..2dbac525a2 100644
--- 
a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/MailToAllUsersTest.java
+++ 
b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/SplitMailTest.java
@@ -20,55 +20,61 @@
 package org.apache.james.transport.mailets;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 import jakarta.mail.MessagingException;
 
 import org.apache.james.core.Username;
 import org.apache.james.core.builder.MimeMessageBuilder;
-import org.apache.james.user.api.UsersRepository;
 import org.apache.mailet.Mail;
-import org.apache.mailet.MailetContext;
 import org.apache.mailet.base.test.FakeMail;
+import org.apache.mailet.base.test.FakeMailContext;
 import org.apache.mailet.base.test.FakeMailetConfig;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
 
-import reactor.core.publisher.Flux;
+import com.google.common.collect.ImmutableList;
 
-class MailToAllUsersTest {
+class SplitMailTest {
     private static final Username USER_1 = Username.of("[email protected]");
     private static final Username USER_2 = Username.of("[email protected]");
     private static final Username USER_3 = Username.of("[email protected]");
     private static final Username USER_4 = Username.of("[email protected]");
+    private static final Username USER_5 = Username.of("[email protected]");
 
-    private UsersRepository usersRepository;
-    private MailetContext mailetContext;
-    private MailToAllUsers testee;
+    private FakeMailContext mailetContext;
+    private SplitMail splitMail;
 
     @BeforeEach
-    void setUp() {
-        usersRepository = mock(UsersRepository.class);
-        mailetContext = Mockito.mock(MailetContext.class);
-        testee = new MailToAllUsers(usersRepository);
+    void setUp() throws Exception {
+        mailetContext = FakeMailContext.defaultContext();
+        splitMail = new SplitMail();
+        splitMail.init(FakeMailetConfig.builder()
+            .mailetContext(mailetContext)
+            .setProperty("batchSize", "2")
+            .build());
     }
 
     @Test
-    void firstUsersBatchShouldBeSentDirectly() throws Exception {
-        when(usersRepository.listReactive())
-            .thenReturn(Flux.just(USER_1, USER_2, USER_3, USER_4));
+    void firstUsersBatchShouldBeSentDirectlyWhenExactlyBatchSize() throws 
Exception {
+        Mail originalMail = createMail();
+        originalMail.setRecipients(ImmutableList.of(USER_1.asMailAddress(),
+            USER_2.asMailAddress()));
 
-        testee.init(FakeMailetConfig.builder()
-            .mailetContext(mailetContext)
-            .setProperty("batchSize", "2")
-            .build());
+        splitMail.service(originalMail);
+
+        assertThat(originalMail.getRecipients())
+            .containsExactlyInAnyOrder(USER_1.asMailAddress(), 
USER_2.asMailAddress());
+    }
 
+    @Test
+    void firstUsersBatchShouldBeSentDirectly() throws Exception {
         Mail originalMail = createMail();
-        testee.service(originalMail);
+        originalMail.setRecipients(ImmutableList.of(USER_1.asMailAddress(),
+            USER_2.asMailAddress(),
+            USER_3.asMailAddress(),
+            USER_4.asMailAddress()));
+
+        splitMail.service(originalMail);
 
         assertThat(originalMail.getRecipients())
             .containsExactlyInAnyOrder(USER_1.asMailAddress(), 
USER_2.asMailAddress());
@@ -76,21 +82,32 @@ class MailToAllUsersTest {
 
     @Test
     void remainingUsersBatchesShouldBeSentAsync() throws Exception {
-        when(usersRepository.listReactive())
-            .thenReturn(Flux.just(USER_1, USER_2, USER_3, USER_4));
+        Mail originalMail = createMail();
+        originalMail.setRecipients(ImmutableList.of(USER_1.asMailAddress(),
+            USER_2.asMailAddress(),
+            USER_3.asMailAddress(),
+            USER_4.asMailAddress()));
 
-        testee.init(FakeMailetConfig.builder()
-            .mailetContext(mailetContext)
-            .setProperty("batchSize", "2")
-            .build());
+        splitMail.service(originalMail);
 
+        assertThat(mailetContext.getSentMails().getFirst().getRecipients())
+            .containsExactly(USER_3.asMailAddress(), USER_4.asMailAddress());
+    }
+
+    @Test
+    void remainingUsersBatchesShouldBeSentAsyncInSeveralBatches() throws 
Exception {
         Mail originalMail = createMail();
-        testee.service(originalMail);
+        originalMail.setRecipients(ImmutableList.of(USER_1.asMailAddress(),
+            USER_2.asMailAddress(),
+            USER_3.asMailAddress(),
+            USER_4.asMailAddress(),
+            USER_5.asMailAddress()));
 
-        ArgumentCaptor<Mail> mailCaptor = ArgumentCaptor.forClass(Mail.class);
-        verify(mailetContext).sendMail(mailCaptor.capture());
-        assertThat(mailCaptor.getValue().getRecipients())
-            .containsExactly(USER_3.asMailAddress(), USER_4.asMailAddress());
+        splitMail.service(originalMail);
+
+        assertThat(mailetContext.getSentMails())
+            .hasSize(2)
+            .allMatch(mail -> mail.getRecipients().size() <= 2);
     }
 
     private Mail createMail() throws MessagingException {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to