This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 59e9d8fcdc3ad7d3c05d24ef2972ab3881466810 Author: Tung Tran <[email protected]> AuthorDate: Wed Sep 18 16:24:33 2024 +0700 AmqpForwardAttribute mailet supports declaring exchange types. --- .../servers/partials/AmqpForwardAttribute.adoc | 1 + .../transport/mailets/AmqpForwardAttribute.java | 20 +++++++++++++- .../mailets/AmqpForwardAttributeTest.java | 32 ++++++++++++++++++++++ 3 files changed, 52 insertions(+), 1 deletion(-) diff --git a/docs/modules/servers/partials/AmqpForwardAttribute.adoc b/docs/modules/servers/partials/AmqpForwardAttribute.adoc index 8ce53b575a..46b4e79460 100644 --- a/docs/modules/servers/partials/AmqpForwardAttribute.adoc +++ b/docs/modules/servers/partials/AmqpForwardAttribute.adoc @@ -9,6 +9,7 @@ where the byte[] content is issued from a MimeBodyPart. It is typically generated from the StripAttachment mailet. * uri (mandatory): AMQP URI defining the server where to send the attachment. * exchange (mandatory): name of the AMQP exchange. +* exchange_type (optional, default to "direct"): type of the exchange. Can be "direct", "fanout", "topic", "headers". * routing_key (optional, default to empty string): name of the routing key on this exchange. This mailet will sent the data attached to the mail as an attribute holding a map. \ No newline at end of file diff --git a/mailet/amqp/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java b/mailet/amqp/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java index 1a8198dba2..5c0dc09a77 100644 --- a/mailet/amqp/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java +++ b/mailet/amqp/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java @@ -23,6 +23,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; @@ -31,6 +32,7 @@ import java.util.stream.Stream; import jakarta.annotation.PreDestroy; import jakarta.inject.Inject; +import org.apache.commons.lang3.StringUtils; import org.apache.james.backends.rabbitmq.RabbitMQConfiguration; import org.apache.james.backends.rabbitmq.RabbitMQConnectionFactory; import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; @@ -55,6 +57,7 @@ import com.google.common.base.Splitter; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.rabbitmq.client.AlreadyClosedException; +import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.ConnectionFactory; import reactor.core.publisher.Flux; @@ -72,6 +75,7 @@ import reactor.rabbitmq.Sender; * It is typically generated from the StripAttachment mailet.</li> * <li>uri (mandatory): AMQP URI defining the server where to send the attachment.</li> * <li>exchange (mandatory): name of the AMQP exchange.</li> + * <li>exchange_type (optional, default to "direct"): type of the exchange. Valid values are: direct, fanout, topic, headers.</li> * <li>routing_key (optional, default to empty string): name of the routing key on this exchange.</li> * </ul> * @@ -90,11 +94,13 @@ public class AmqpForwardAttribute extends GenericMailet { private static final String DEFAULT_USER = "guest"; private static final String DEFAULT_PASSWORD_STRING = "guest"; private static final char[] DEFAULT_PASSWORD = DEFAULT_PASSWORD_STRING.toCharArray(); + private static final List<String> VALIDATE_EXCHANGE_TYPES = Arrays.stream(BuiltinExchangeType.values()).map(BuiltinExchangeType::getType).toList(); static final RabbitMQConfiguration.ManagementCredentials DEFAULT_MANAGEMENT_CREDENTIAL = new RabbitMQConfiguration.ManagementCredentials(DEFAULT_USER, DEFAULT_PASSWORD); public static final String URI_PARAMETER_NAME = "uri"; public static final String EXCHANGE_PARAMETER_NAME = "exchange"; + public static final String EXCHANGE_TYPE_PARAMETER_NAME = "exchange_type"; public static final String ROUTING_KEY_PARAMETER_NAME = "routing_key"; public static final String ATTRIBUTE_PARAMETER_NAME = "attribute"; @@ -103,6 +109,7 @@ public class AmqpForwardAttribute extends GenericMailet { private final MetricFactory metricFactory; private String exchange; + private String exchangeType; private AttributeName attribute; private ConnectionFactory connectionFactory; @VisibleForTesting String routingKey; @@ -142,7 +149,12 @@ public class AmqpForwardAttribute extends GenericMailet { metricFactory, new NoopGaugeRegistry()); reactorRabbitMQChannelPool.start(); sender = reactorRabbitMQChannelPool.getSender(); - sender.declareExchange(ExchangeSpecification.exchange(exchange)).block(); + + ExchangeSpecification exchangeSpecification = Optional.ofNullable(exchangeType) + .map(type -> ExchangeSpecification.exchange(exchange).type(type)) + .orElse(ExchangeSpecification.exchange(exchange)); + + sender.declareExchange(exchangeSpecification).block(); } catch (URISyntaxException e) { throw new RuntimeException(e); } @@ -183,6 +195,12 @@ public class AmqpForwardAttribute extends GenericMailet { throw new MailetException("No value for " + EXCHANGE_PARAMETER_NAME + " parameter was provided."); } + exchangeType = mailetConfig.getInitParameter(EXCHANGE_TYPE_PARAMETER_NAME); + if (StringUtils.isNotEmpty(exchangeType) && !VALIDATE_EXCHANGE_TYPES.contains(exchangeType)) { + throw new MailetException("Invalid value for " + EXCHANGE_TYPE_PARAMETER_NAME + + " parameter was provided: " + exchangeType + ". Valid values are: " + VALIDATE_EXCHANGE_TYPES); + } + routingKey = Optional.ofNullable(mailetConfig.getInitParameter(ROUTING_KEY_PARAMETER_NAME)) .orElse(ROUTING_KEY_DEFAULT_VALUE); String rawAttribute = mailetConfig.getInitParameter(ATTRIBUTE_PARAMETER_NAME); diff --git a/mailet/amqp/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttributeTest.java b/mailet/amqp/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttributeTest.java index e3a6afaf24..3569fe5c5a 100644 --- a/mailet/amqp/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttributeTest.java +++ b/mailet/amqp/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttributeTest.java @@ -21,6 +21,7 @@ package org.apache.james.transport.mailets; import static org.apache.james.transport.mailets.AmqpForwardAttribute.DEFAULT_MANAGEMENT_CREDENTIAL; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -43,6 +44,8 @@ import org.apache.mailet.base.test.FakeMailetConfig; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; class AmqpForwardAttributeTest { @@ -153,6 +156,35 @@ class AmqpForwardAttributeTest { .isInstanceOf(MailetException.class); } + @Test + void initShouldThrowWhenInvalidExchangeType() { + FakeMailetConfig customMailetConfig = FakeMailetConfig.builder() + .mailetName("Test") + .mailetContext(mailetContext) + .setProperty("uri", AMQP_URI) + .setProperty("exchange", EXCHANGE_NAME) + .setProperty("exchange_type", "invalid") + .setProperty("attribute", MAIL_ATTRIBUTE.asString()) + .build(); + assertThatThrownBy(() -> mailet.preInit(customMailetConfig)) + .isInstanceOf(MailetException.class) + .hasMessageContaining("Invalid value for exchange_type parameter was provided: invalid"); + } + + @ParameterizedTest + @ValueSource(strings = {"direct", "fanout", "topic", "headers"}) + void initShouldNotThrowWhenValidExchangeType(String exchangeType) { + assertThatCode(() -> mailet.preInit(FakeMailetConfig.builder() + .mailetName("Test") + .mailetContext(mailetContext) + .setProperty("uri", AMQP_URI) + .setProperty("exchange", EXCHANGE_NAME) + .setProperty("exchange_type", exchangeType) + .setProperty("attribute", MAIL_ATTRIBUTE.asString()) + .build())) + .doesNotThrowAnyException(); + } + @Nested class CreadentialTests { @Test --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
