This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 340b31f0da JAMES-4081 Implement MailToAllUsers mailet (#2469)
340b31f0da is described below
commit 340b31f0da3a94ed4a6cbf200a69e67277b2a578
Author: Trần Hồng Quân <[email protected]>
AuthorDate: Mon Oct 28 10:47:24 2024 +0700
JAMES-4081 Implement MailToAllUsers mailet (#2469)
---
docs/modules/servers/partials/MailToAllUsers.adoc | 25 +++++
.../james/transport/mailets/MailToAllUsers.java | 124 +++++++++++++++++++++
.../transport/mailets/MailToAllUsersTest.java | 106 ++++++++++++++++++
3 files changed, 255 insertions(+)
diff --git a/docs/modules/servers/partials/MailToAllUsers.adoc
b/docs/modules/servers/partials/MailToAllUsers.adoc
new file mode 100644
index 0000000000..caf5516d3f
--- /dev/null
+++ b/docs/modules/servers/partials/MailToAllUsers.adoc
@@ -0,0 +1,25 @@
+=== MailToAllUsers
+
+A mailet that helps to email to all users in the system. The emails are sent
in batches to manage
+the load. The first batch is sent directly, while the remaining batches are
sent asynchronously.
+
+==== Configuration parameters
+
+*batchSize*::
+The number of recipients to include in each batch. Optional, default to 100.
+
+==== Sample configuration
+
+[source,xml]
+----
+<matcher name="notify-matcher"
match="org.apache.james.mailetcontainer.impl.matchers.And">
+ <matcher match="[email protected]"/>
+ <matcher match="[email protected]"/>
+</matcher>
+<mailet match="notify-matcher" class="MailToAllUsers">
+ <batchSize>100</batchSize>
+</mailet>
+----
+
+
+
diff --git
a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/MailToAllUsers.java
b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/MailToAllUsers.java
new file mode 100644
index 0000000000..46a28d42c0
--- /dev/null
+++
b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/MailToAllUsers.java
@@ -0,0 +1,124 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.transport.mailets;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+import jakarta.inject.Inject;
+import jakarta.mail.MessagingException;
+
+import org.apache.james.core.MailAddress;
+import org.apache.james.core.Username;
+import org.apache.james.lifecycle.api.LifecycleUtil;
+import org.apache.james.user.api.UsersRepository;
+import org.apache.mailet.Mail;
+import org.apache.mailet.base.GenericMailet;
+import org.reactivestreams.Publisher;
+
+import com.github.fge.lambdas.Throwing;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
+
+/**
+ * A mailet that helps to email all users in the system. The emails are sent
in batches to manage
+ * the load. The first batch is sent directly, while the remaining batches are
sent asynchronously.
+ * The batch size can be configured via the <b>batchSize</b> parameter
(optional, defaults to 100).
+ *
+ * <h3>Configuration</h3>
+ * <pre><code>
+ * {@code
+ * <matcher name="notify-matcher"
match="org.apache.james.mailetcontainer.impl.matchers.And">
+ * <matcher match="[email protected]"/>
+ * <matcher match="[email protected]"/>
+ * </matcher>
+ * <mailet match="notify-matcher" class="MailToAllUsers">
+ * <batchSize>100</batchSize>
+ * </mailet>
+ * }
+ * </code></pre>
+ *
+ */
+public class MailToAllUsers extends GenericMailet {
+ private static final int DEFAULT_BATCH_SIZE = 100;
+ private final UsersRepository usersRepository;
+ private int batchSize;
+
+ @Inject
+ public MailToAllUsers(UsersRepository usersRepository) {
+ this.usersRepository = usersRepository;
+ }
+
+ @Override
+ public void init() throws MessagingException {
+ batchSize =
Integer.parseInt(Optional.ofNullable(getInitParameter("batchSize"))
+ .orElse(String.valueOf(DEFAULT_BATCH_SIZE)));
+ }
+
+ @Override
+ public void service(Mail mail) throws MessagingException {
+ Flux.from(usersRepository.listReactive())
+ .map(Throwing.function(Username::asMailAddress))
+ .window(batchSize)
+ .index()
+ .flatMap(sendMail(mail))
+ .then()
+ .block();
+ }
+
+ private Function<Tuple2<Long, Flux<MailAddress>>, Publisher<Void>>
sendMail(Mail mail) {
+ return tuple -> {
+ boolean firstBatch = tuple.getT1() == 0;
+ if (firstBatch) {
+ return sendMailToFirstRecipientsBatchDirectly(mail,
tuple.getT2());
+ }
+ return sendMailToRemainingRecipientsBatchAsynchronously(mail,
tuple.getT2());
+ };
+ }
+
+ private Mono<Void> sendMailToFirstRecipientsBatchDirectly(Mail mail,
Flux<MailAddress> firstRecipientsBatch) {
+ return firstRecipientsBatch
+ .collectList()
+ .flatMap(recipients -> Mono.fromRunnable(() ->
mail.setRecipients(recipients)))
+ .then();
+ }
+
+ private Mono<Void> sendMailToRemainingRecipientsBatchAsynchronously(Mail
mail, Flux<MailAddress> remainingRecipientsBatch) {
+ return remainingRecipientsBatch
+ .collectList()
+ .flatMap(recipients -> Mono.fromRunnable(Throwing.runnable(() -> {
+ Mail duplicateMail = mail.duplicate();
+ try {
+ duplicateMail.setRecipients(recipients);
+ getMailetContext().sendMail(duplicateMail);
+ } finally {
+ LifecycleUtil.dispose(duplicateMail);
+ }
+ })))
+ .then();
+ }
+
+ @Override
+ public String getMailetName() {
+ return "MailToAllUsers";
+ }
+}
diff --git
a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/MailToAllUsersTest.java
b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/MailToAllUsersTest.java
new file mode 100644
index 0000000000..be7dd91657
--- /dev/null
+++
b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/MailToAllUsersTest.java
@@ -0,0 +1,106 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.transport.mailets;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import jakarta.mail.MessagingException;
+
+import org.apache.james.core.Username;
+import org.apache.james.core.builder.MimeMessageBuilder;
+import org.apache.james.user.api.UsersRepository;
+import org.apache.mailet.Mail;
+import org.apache.mailet.MailetContext;
+import org.apache.mailet.base.test.FakeMail;
+import org.apache.mailet.base.test.FakeMailetConfig;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import reactor.core.publisher.Flux;
+
+class MailToAllUsersTest {
+ private static final Username USER_1 = Username.of("[email protected]");
+ private static final Username USER_2 = Username.of("[email protected]");
+ private static final Username USER_3 = Username.of("[email protected]");
+ private static final Username USER_4 = Username.of("[email protected]");
+
+ private UsersRepository usersRepository;
+ private MailetContext mailetContext;
+ private MailToAllUsers testee;
+
+ @BeforeEach
+ void setUp() {
+ usersRepository = mock(UsersRepository.class);
+ mailetContext = Mockito.mock(MailetContext.class);
+ testee = new MailToAllUsers(usersRepository);
+ }
+
+ @Test
+ void firstUsersBatchShouldBeSentDirectly() throws Exception {
+ when(usersRepository.listReactive())
+ .thenReturn(Flux.just(USER_1, USER_2, USER_3, USER_4));
+
+ testee.init(FakeMailetConfig.builder()
+ .mailetContext(mailetContext)
+ .setProperty("batchSize", "2")
+ .build());
+
+ Mail originalMail = createMail();
+ testee.service(originalMail);
+
+ assertThat(originalMail.getRecipients())
+ .containsExactlyInAnyOrder(USER_1.asMailAddress(),
USER_2.asMailAddress());
+ }
+
+ @Test
+ void remainingUsersBatchesShouldBeSentAsync() throws Exception {
+ when(usersRepository.listReactive())
+ .thenReturn(Flux.just(USER_1, USER_2, USER_3, USER_4));
+
+ testee.init(FakeMailetConfig.builder()
+ .mailetContext(mailetContext)
+ .setProperty("batchSize", "2")
+ .build());
+
+ Mail originalMail = createMail();
+ testee.service(originalMail);
+
+ ArgumentCaptor<Mail> mailCaptor = ArgumentCaptor.forClass(Mail.class);
+ verify(mailetContext).sendMail(mailCaptor.capture());
+ assertThat(mailCaptor.getValue().getRecipients())
+ .containsExactly(USER_3.asMailAddress(), USER_4.asMailAddress());
+ }
+
+ private Mail createMail() throws MessagingException {
+ return FakeMail.builder()
+ .name("name")
+ .mimeMessage(MimeMessageBuilder.mimeMessageBuilder()
+ .setSender("[email protected]")
+ .setSubject("Hi all")
+ .addToRecipient("[email protected]"))
+ .recipient("[email protected]")
+ .build();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]