This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 59e9d8fcdc3ad7d3c05d24ef2972ab3881466810
Author: Tung Tran <[email protected]>
AuthorDate: Wed Sep 18 16:24:33 2024 +0700

    AmqpForwardAttribute mailet supports declaring exchange types.
---
 .../servers/partials/AmqpForwardAttribute.adoc     |  1 +
 .../transport/mailets/AmqpForwardAttribute.java    | 20 +++++++++++++-
 .../mailets/AmqpForwardAttributeTest.java          | 32 ++++++++++++++++++++++
 3 files changed, 52 insertions(+), 1 deletion(-)

diff --git a/docs/modules/servers/partials/AmqpForwardAttribute.adoc 
b/docs/modules/servers/partials/AmqpForwardAttribute.adoc
index 8ce53b575a..46b4e79460 100644
--- a/docs/modules/servers/partials/AmqpForwardAttribute.adoc
+++ b/docs/modules/servers/partials/AmqpForwardAttribute.adoc
@@ -9,6 +9,7 @@ where the byte[] content is issued from a MimeBodyPart.
 It is typically generated from the StripAttachment mailet.
 * uri (mandatory): AMQP URI defining the server where to send the attachment.
 * exchange (mandatory): name of the AMQP exchange.
+* exchange_type (optional, default to "direct"): type of the exchange. Can be 
"direct", "fanout", "topic", "headers".
 * routing_key (optional, default to empty string): name of the routing key on 
this exchange.
 
 This mailet will sent the data attached to the mail as an attribute holding a 
map.
\ No newline at end of file
diff --git 
a/mailet/amqp/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java
 
b/mailet/amqp/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java
index 1a8198dba2..5c0dc09a77 100644
--- 
a/mailet/amqp/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java
+++ 
b/mailet/amqp/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java
@@ -23,6 +23,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -31,6 +32,7 @@ import java.util.stream.Stream;
 import jakarta.annotation.PreDestroy;
 import jakarta.inject.Inject;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
 import org.apache.james.backends.rabbitmq.RabbitMQConnectionFactory;
 import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
@@ -55,6 +57,7 @@ import com.google.common.base.Splitter;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.rabbitmq.client.AlreadyClosedException;
+import com.rabbitmq.client.BuiltinExchangeType;
 import com.rabbitmq.client.ConnectionFactory;
 
 import reactor.core.publisher.Flux;
@@ -72,6 +75,7 @@ import reactor.rabbitmq.Sender;
  * It is typically generated from the StripAttachment mailet.</li>
  * <li>uri (mandatory): AMQP URI defining the server where to send the 
attachment.</li>
  * <li>exchange (mandatory): name of the AMQP exchange.</li>
+ * <li>exchange_type (optional, default to "direct"): type of the exchange. 
Valid values are: direct, fanout, topic, headers.</li>
  * <li>routing_key (optional, default to empty string): name of the routing 
key on this exchange.</li>
  * </ul>
  *
@@ -90,11 +94,13 @@ public class AmqpForwardAttribute extends GenericMailet {
     private static final String DEFAULT_USER = "guest";
     private static final String DEFAULT_PASSWORD_STRING = "guest";
     private static final char[] DEFAULT_PASSWORD = 
DEFAULT_PASSWORD_STRING.toCharArray();
+    private static final List<String> VALIDATE_EXCHANGE_TYPES = 
Arrays.stream(BuiltinExchangeType.values()).map(BuiltinExchangeType::getType).toList();
     static final RabbitMQConfiguration.ManagementCredentials 
DEFAULT_MANAGEMENT_CREDENTIAL = new 
RabbitMQConfiguration.ManagementCredentials(DEFAULT_USER, DEFAULT_PASSWORD);
 
 
     public static final String URI_PARAMETER_NAME = "uri";
     public static final String EXCHANGE_PARAMETER_NAME = "exchange";
+    public static final String EXCHANGE_TYPE_PARAMETER_NAME = "exchange_type";
     public static final String ROUTING_KEY_PARAMETER_NAME = "routing_key";
     public static final String ATTRIBUTE_PARAMETER_NAME = "attribute";
 
@@ -103,6 +109,7 @@ public class AmqpForwardAttribute extends GenericMailet {
     private final MetricFactory metricFactory;
 
     private String exchange;
+    private String exchangeType;
     private AttributeName attribute;
     private ConnectionFactory connectionFactory;
     @VisibleForTesting String routingKey;
@@ -142,7 +149,12 @@ public class AmqpForwardAttribute extends GenericMailet {
                 metricFactory, new NoopGaugeRegistry());
             reactorRabbitMQChannelPool.start();
             sender = reactorRabbitMQChannelPool.getSender();
-            
sender.declareExchange(ExchangeSpecification.exchange(exchange)).block();
+
+            ExchangeSpecification exchangeSpecification = 
Optional.ofNullable(exchangeType)
+                .map(type -> 
ExchangeSpecification.exchange(exchange).type(type))
+                .orElse(ExchangeSpecification.exchange(exchange));
+
+            sender.declareExchange(exchangeSpecification).block();
         } catch (URISyntaxException e) {
             throw new RuntimeException(e);
         }
@@ -183,6 +195,12 @@ public class AmqpForwardAttribute extends GenericMailet {
             throw new MailetException("No value for " + EXCHANGE_PARAMETER_NAME
                     + " parameter was provided.");
         }
+        exchangeType = 
mailetConfig.getInitParameter(EXCHANGE_TYPE_PARAMETER_NAME);
+        if (StringUtils.isNotEmpty(exchangeType) && 
!VALIDATE_EXCHANGE_TYPES.contains(exchangeType)) {
+            throw new MailetException("Invalid value for " + 
EXCHANGE_TYPE_PARAMETER_NAME
+                + " parameter was provided: " + exchangeType + ". Valid values 
are: " + VALIDATE_EXCHANGE_TYPES);
+        }
+
         routingKey = 
Optional.ofNullable(mailetConfig.getInitParameter(ROUTING_KEY_PARAMETER_NAME))
             .orElse(ROUTING_KEY_DEFAULT_VALUE);
         String rawAttribute = 
mailetConfig.getInitParameter(ATTRIBUTE_PARAMETER_NAME);
diff --git 
a/mailet/amqp/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttributeTest.java
 
b/mailet/amqp/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttributeTest.java
index e3a6afaf24..3569fe5c5a 100644
--- 
a/mailet/amqp/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttributeTest.java
+++ 
b/mailet/amqp/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttributeTest.java
@@ -21,6 +21,7 @@ package org.apache.james.transport.mailets;
 
 import static 
org.apache.james.transport.mailets.AmqpForwardAttribute.DEFAULT_MANAGEMENT_CREDENTIAL;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -43,6 +44,8 @@ import org.apache.mailet.base.test.FakeMailetConfig;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
 
 class AmqpForwardAttributeTest {
@@ -153,6 +156,35 @@ class AmqpForwardAttributeTest {
             .isInstanceOf(MailetException.class);
     }
 
+    @Test
+    void initShouldThrowWhenInvalidExchangeType() {
+        FakeMailetConfig customMailetConfig = FakeMailetConfig.builder()
+            .mailetName("Test")
+            .mailetContext(mailetContext)
+            .setProperty("uri", AMQP_URI)
+            .setProperty("exchange", EXCHANGE_NAME)
+            .setProperty("exchange_type", "invalid")
+            .setProperty("attribute", MAIL_ATTRIBUTE.asString())
+            .build();
+        assertThatThrownBy(() -> mailet.preInit(customMailetConfig))
+            .isInstanceOf(MailetException.class)
+            .hasMessageContaining("Invalid value for exchange_type parameter 
was provided: invalid");
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"direct", "fanout", "topic", "headers"})
+    void initShouldNotThrowWhenValidExchangeType(String exchangeType) {
+        assertThatCode(() -> mailet.preInit(FakeMailetConfig.builder()
+            .mailetName("Test")
+            .mailetContext(mailetContext)
+            .setProperty("uri", AMQP_URI)
+            .setProperty("exchange", EXCHANGE_NAME)
+            .setProperty("exchange_type", exchangeType)
+            .setProperty("attribute", MAIL_ATTRIBUTE.asString())
+            .build()))
+            .doesNotThrowAnyException();
+    }
+
     @Nested
     class CreadentialTests {
         @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to