This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 48d22287d3e68343cd7c96839991507bd452c424 Author: Tung Tran <[email protected]> AuthorDate: Wed Sep 18 16:42:44 2024 +0700 AmqpForwardAttribute mailet supports declaring exchange types - integration test --- ...ibuteWithFanoutExchangeTypeIntegrationTest.java | 148 +++++++++++++++++++++ .../transport/mailets/amqp/AmqpExtension.java | 10 +- 2 files changed, 156 insertions(+), 2 deletions(-) diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttributeWithFanoutExchangeTypeIntegrationTest.java b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttributeWithFanoutExchangeTypeIntegrationTest.java new file mode 100644 index 0000000000..9ec3a3c8a0 --- /dev/null +++ b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttributeWithFanoutExchangeTypeIntegrationTest.java @@ -0,0 +1,148 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.transport.mailets; + +import static net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson; +import static org.apache.james.mailets.configuration.Constants.DEFAULT_DOMAIN; +import static org.apache.james.mailets.configuration.Constants.LOCALHOST_IP; +import static org.apache.james.mailets.configuration.Constants.PASSWORD; +import static org.apache.james.mailets.configuration.Constants.awaitAtMostOneMinute; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.util.Optional; + +import org.apache.james.MemoryJamesServerMain; +import org.apache.james.core.builder.MimeMessageBuilder; +import org.apache.james.mailets.TemporaryJamesServer; +import org.apache.james.mailets.configuration.CommonProcessors; +import org.apache.james.mailets.configuration.MailetConfiguration; +import org.apache.james.mailets.configuration.MailetContainer; +import org.apache.james.mailets.configuration.ProcessorConfiguration; +import org.apache.james.modules.protocols.ImapGuiceProbe; +import org.apache.james.modules.protocols.SmtpGuiceProbe; +import org.apache.james.transport.mailets.amqp.AmqpExtension; +import org.apache.james.transport.matchers.All; +import org.apache.james.transport.matchers.SMTPAuthSuccessful; +import org.apache.james.utils.DataProbeImpl; +import org.apache.james.utils.SMTPMessageSender; +import org.apache.james.utils.TestIMAPClient; +import org.apache.mailet.base.test.FakeMail; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; + +import com.rabbitmq.client.BuiltinExchangeType; + +class AmqpForwardAttributeWithFanoutExchangeTypeIntegrationTest { + public static final String SENDER = "sender@" + DEFAULT_DOMAIN; + public static final String TO = "to@" + DEFAULT_DOMAIN; + public static final String TO2 = "to2@" + DEFAULT_DOMAIN; + public static final String CC = "cc@" + DEFAULT_DOMAIN; + public static final String CC2 = "cc2@" + DEFAULT_DOMAIN; + public static final String BCC = "bcc@" + DEFAULT_DOMAIN; + public static final String BCC2 = "bcc2@" + DEFAULT_DOMAIN; + public static final String EXCHANGE = "collector:email"; + public static final String ROUTING_KEY = "routing1"; + + @RegisterExtension + public static AmqpExtension amqpExtension = new AmqpExtension(EXCHANGE, ROUTING_KEY, BuiltinExchangeType.FANOUT); + @RegisterExtension + public TestIMAPClient testIMAPClient = new TestIMAPClient(); + @RegisterExtension + public SMTPMessageSender messageSender = new SMTPMessageSender(DEFAULT_DOMAIN); + + private TemporaryJamesServer jamesServer; + + @BeforeEach + void setup(@TempDir File temporaryFolder) throws Exception { + String attribute = "ExtractedContacts"; + MailetContainer.Builder mailets = TemporaryJamesServer.defaultMailetContainerConfiguration() + .postmaster(SENDER) + .putProcessor( + ProcessorConfiguration.transport() + .addMailet(MailetConfiguration.builder() + .matcher(SMTPAuthSuccessful.class) + .mailet(ContactExtractor.class) + .addProperty(ContactExtractor.Configuration.ATTRIBUTE, attribute)) + .addMailet(MailetConfiguration.builder() + .matcher(All.class) + .mailet(AmqpForwardAttribute.class) + .addProperty(AmqpForwardAttribute.URI_PARAMETER_NAME, amqpExtension.getAmqpUri()) + .addProperty(AmqpForwardAttribute.EXCHANGE_PARAMETER_NAME, EXCHANGE) + .addProperty(AmqpForwardAttribute.EXCHANGE_TYPE_PARAMETER_NAME, BuiltinExchangeType.FANOUT.getType()) + .addProperty(AmqpForwardAttribute.ATTRIBUTE_PARAMETER_NAME, attribute)) + .addMailetsFrom(CommonProcessors.deliverOnlyTransport())); + + jamesServer = TemporaryJamesServer.builder() + .withBase(MemoryJamesServerMain.SMTP_AND_IMAP_MODULE) + .withMailetContainer(mailets) + .build(temporaryFolder); + jamesServer.start(); + + jamesServer.getProbe(DataProbeImpl.class) + .fluent() + .addDomain(DEFAULT_DOMAIN) + .addUser(SENDER, PASSWORD) + .addUser(TO, PASSWORD) + .addUser(TO2, PASSWORD) + .addUser(CC, PASSWORD) + .addUser(CC2, PASSWORD) + .addUser(BCC, PASSWORD) + .addUser(BCC2, PASSWORD); + } + + @AfterEach + void tearDown() { + jamesServer.shutdown(); + } + + @Test + void recipientsShouldBePublishedToAmqpWhenSendingEmail() throws Exception { + MimeMessageBuilder message = MimeMessageBuilder.mimeMessageBuilder() + .setSender(SENDER) + .addToRecipient(TO, "John To2 <" + TO2 + ">") + .addCcRecipient(CC, "John Cc2 <" + CC2 + ">") + .addBccRecipient(BCC, "John Bcc2 <" + BCC2 + ">") + .setSubject("Contact collection Rocks") + .setText("This is my email"); + messageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort()) + .authenticate(SENDER, PASSWORD) + .sendMessage(FakeMail.builder() + .name("name") + .mimeMessage(message) + .sender(SENDER) + .recipients(TO, TO2, CC, CC2, BCC, BCC2)); + + testIMAPClient.connect(LOCALHOST_IP, jamesServer.getProbe(ImapGuiceProbe.class).getImapPort()) + .login(TO, PASSWORD) + .select(TestIMAPClient.INBOX) + .awaitMessage(awaitAtMostOneMinute); + + Optional<String> actual = amqpExtension.readContent(); + assertThat(actual).isNotEmpty(); + assertThatJson(actual.get()).isEqualTo("{" + + "\"userEmail\" : \"[email protected]\", " + + "\"emails\" : [ \"[email protected]\", \"John To2 <[email protected]>\", \"[email protected]\", \"John Cc2 <[email protected]>\", \"[email protected]\", \"John Bcc2 <[email protected]>\" ]" + + "}"); + } + +} diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/amqp/AmqpExtension.java b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/amqp/AmqpExtension.java index e7a1fbca37..7e25a7ced7 100644 --- a/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/amqp/AmqpExtension.java +++ b/server/mailet/integration-testing/src/test/java/org/apache/james/transport/mailets/amqp/AmqpExtension.java @@ -50,13 +50,14 @@ public class AmqpExtension implements BeforeAllCallback, AfterAllCallback, After private final DockerContainer rabbitMqContainer; private final String exchangeName; + private final BuiltinExchangeType exchangeType; private final String routingKey; private Channel channel; private String queueName; private Connection connection; private String amqpUri; - public AmqpExtension(String exchangeName, String routingKey) { + public AmqpExtension(String exchangeName, String routingKey, BuiltinExchangeType exchangeType) { this.rabbitMqContainer = DockerContainer.fromName(Images.RABBITMQ) .withAffinityToContainer() .waitingFor(new HostPortWaitStrategy() @@ -64,6 +65,11 @@ public class AmqpExtension implements BeforeAllCallback, AfterAllCallback, After .withLogConsumer(AmqpExtension::displayDockerLog); this.exchangeName = exchangeName; this.routingKey = routingKey; + this.exchangeType = exchangeType; + } + + public AmqpExtension(String exchangeName, String routingKey) { + this(exchangeName, routingKey, BuiltinExchangeType.DIRECT); } private static void displayDockerLog(OutputFrame outputFrame) { @@ -79,7 +85,7 @@ public class AmqpExtension implements BeforeAllCallback, AfterAllCallback, After waitingForRabbitToBeReady(factory); connection = factory.newConnection(); channel = connection.createChannel(); - channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT); + channel.exchangeDeclare(exchangeName, exchangeType); queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, exchangeName, routingKey); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
