MAILET-143 Add AmqpForwardAttribute mailet which forward attribute values to 
RabbitMQ via AMQP prococol


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/13b00ee9
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/13b00ee9
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/13b00ee9

Branch: refs/heads/master
Commit: 13b00ee9e1a07b2060b488e10701787ed7835f69
Parents: f779a3c
Author: Raphael Ouazana <raphael.ouaz...@linagora.com>
Authored: Thu Dec 15 17:04:56 2016 +0100
Committer: Raphael Ouazana <raphael.ouaz...@linagora.com>
Committed: Wed Dec 21 18:31:07 2016 +0100

----------------------------------------------------------------------
 .../mailet/base/test/FakeMailContext.java       |  20 +-
 mailet/pom.xml                                  |   5 +
 mailet/standard/pom.xml                         |   4 +
 .../transport/mailets/AmqpForwardAttribute.java | 167 +++++++++++++
 .../mailets/AmqpForwardAttributeTest.java       | 237 +++++++++++++++++++
 .../james/transport/mailets/LogMessageTest.java |   8 +-
 6 files changed, 435 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/13b00ee9/mailet/base/src/test/java/org/apache/mailet/base/test/FakeMailContext.java
----------------------------------------------------------------------
diff --git 
a/mailet/base/src/test/java/org/apache/mailet/base/test/FakeMailContext.java 
b/mailet/base/src/test/java/org/apache/mailet/base/test/FakeMailContext.java
index d6a4516..6296e94 100644
--- a/mailet/base/src/test/java/org/apache/mailet/base/test/FakeMailContext.java
+++ b/mailet/base/src/test/java/org/apache/mailet/base/test/FakeMailContext.java
@@ -344,8 +344,24 @@ public class FakeMailContext implements MailetContext {
     }
 
     public void log(LogLevel level, String message, Throwable t) {
-        log(level, message);
-        log(level, t.getMessage());
+        if (logger.isPresent()) {
+            switch (level) {
+            case INFO:
+                logger.get().info(message, t);
+                break;
+            case WARN:
+                logger.get().warn(message, t);
+                break;
+            case ERROR:
+                logger.get().error(message, t);
+                break;
+            default:
+                logger.get().debug(message, t);
+            }
+        } else {
+            System.out.println("[" + level + "]" + message);
+            t.printStackTrace(System.out);
+        }
     }
 
     public List<String> dnsLookup(String name, RecordType type) throws 
LookupException {

http://git-wip-us.apache.org/repos/asf/james-project/blob/13b00ee9/mailet/pom.xml
----------------------------------------------------------------------
diff --git a/mailet/pom.xml b/mailet/pom.xml
index 368b27f..e2dd161 100644
--- a/mailet/pom.xml
+++ b/mailet/pom.xml
@@ -87,6 +87,11 @@
                 <version>${mime4j.version}</version>
             </dependency>
             <dependency>
+                <groupId>com.rabbitmq</groupId>
+                <artifactId>amqp-client</artifactId>
+                <version>4.0.0</version>
+            </dependency>
+            <dependency>
                 <groupId>javax.mail</groupId>
                 <artifactId>mail</artifactId>
                 <version>${javax.version}</version>

http://git-wip-us.apache.org/repos/asf/james-project/blob/13b00ee9/mailet/standard/pom.xml
----------------------------------------------------------------------
diff --git a/mailet/standard/pom.xml b/mailet/standard/pom.xml
index 892ca71..9b99bc3 100644
--- a/mailet/standard/pom.xml
+++ b/mailet/standard/pom.xml
@@ -55,6 +55,10 @@
             <artifactId>guava</artifactId>
         </dependency>
         <dependency>
+            <groupId>com.rabbitmq</groupId>
+            <artifactId>amqp-client</artifactId>
+        </dependency>
+        <dependency>
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/james-project/blob/13b00ee9/mailet/standard/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java
----------------------------------------------------------------------
diff --git 
a/mailet/standard/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java
 
b/mailet/standard/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java
new file mode 100644
index 0000000..4596c7c
--- /dev/null
+++ 
b/mailet/standard/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java
@@ -0,0 +1,167 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.transport.mailets;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+import javax.mail.MessagingException;
+import javax.mail.internet.MimeBodyPart;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.mailet.Mail;
+import org.apache.mailet.MailetException;
+import org.apache.mailet.base.GenericMailet;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+/**
+ * This mailet forwards the attributes values to a AMPQ.
+ * <br />
+ * It takes 4 parameters:
+ * <ul>
+ * <li>attribute (mandatory): content to be forwarded, expected to be a 
Map&lt;String, byte[]&gt;
+ * where the byte[] content is issued from a MimeBodyPart.
+ * It is typically generated from the StripAttachment mailet.</li>
+ * <li>uri (mandatory): AMQP URI defining the server where to send the 
attachment.</li>
+ * <li>exchange (mandatory): name of the AMQP exchange.</li>
+ * <li>routing_key (optional, default to empty string): name of the routing 
key on this exchange.</li>
+ * </ul>
+ *
+ * This mailet will extract the attachment content from the MimeBodyPart 
byte[] before
+ * sending it.
+ */
+public class AmqpForwardAttribute extends GenericMailet {
+
+    public static final String URI_PARAMETER_NAME = "uri";
+    public static final String EXCHANGE_PARAMETER_NAME = "exchange";
+    public static final String ROUTING_KEY_PARAMETER_NAME = "routing_key";
+    public static final String ATTRIBUTE_PARAMETER_NAME = "attribute";
+
+    public static final String ROUTING_KEY_DEFAULT_VALUE = "";
+
+    private String exchange;
+    private String attribute;
+    private ConnectionFactory connectionFactory;
+    @VisibleForTesting String routingKey;
+
+    public void init() throws MailetException {
+        String uri = getInitParameter(URI_PARAMETER_NAME);
+        if (Strings.isNullOrEmpty(uri)) {
+            throw new MailetException("No value for " + URI_PARAMETER_NAME
+                    + " parameter was provided.");
+        }
+        exchange = getInitParameter(EXCHANGE_PARAMETER_NAME);
+        if (Strings.isNullOrEmpty(exchange)) {
+            throw new MailetException("No value for " + EXCHANGE_PARAMETER_NAME
+                    + " parameter was provided.");
+        }
+        routingKey = getInitParameter(ROUTING_KEY_PARAMETER_NAME, 
ROUTING_KEY_DEFAULT_VALUE);
+        attribute = getInitParameter(ATTRIBUTE_PARAMETER_NAME);
+        if (Strings.isNullOrEmpty(attribute)) {
+            throw new MailetException("No value for " + 
ATTRIBUTE_PARAMETER_NAME
+                    + " parameter was provided.");
+        }
+        connectionFactory = new ConnectionFactory();
+        try {
+            connectionFactory.setUri(uri);
+        } catch (Exception e) {
+            throw new MailetException("Invalid " + URI_PARAMETER_NAME
+                    + " parameter was provided: " + uri, e);
+        }
+    }
+
+    @VisibleForTesting void setConnectionFactory(ConnectionFactory 
connectionFactory) {
+        this.connectionFactory = connectionFactory;
+    }
+
+    public void service(Mail mail) throws MailetException {
+        if (mail.getAttribute(attribute) == null) {
+            return;
+        }
+        Map<String, byte[]> content = getAttributeContent(mail);
+        try {
+            sendContent(content);
+        } catch (IOException e) {
+            log("IOException while writing to AMQP: " + e.getMessage(), e);
+        } catch (TimeoutException e) {
+            log("TimeoutException while writing to AMQP: " + e.getMessage(), 
e);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private Map<String, byte[]> getAttributeContent(Mail mail) throws 
MailetException {
+        Serializable attributeContent = mail.getAttribute(attribute);
+        if (! (attributeContent instanceof Map)) {
+            throw new MailetException("Invalid attribute found into attribute "
+                    + attribute + "class Map expected but "
+                    + attributeContent.getClass() + " found.");
+        }
+        return (Map<String, byte[]>) attributeContent;
+    }
+
+    private void sendContent(Map<String, byte[]> content) throws IOException, 
TimeoutException {
+        Connection connection = null;
+        Channel channel = null;
+        try {
+            connection = connectionFactory.newConnection();
+            channel = connection.createChannel();
+            channel.exchangeDeclarePassive(exchange);
+            sendContentOnChannel(channel, content);
+        } finally {
+            if (channel != null) {
+                channel.close();
+            }
+            if (connection != null) {
+                connection.close();
+            }
+        }
+    }
+
+    private void sendContentOnChannel(Channel channel, Map<String, byte[]> 
content) throws IOException {
+        for (Map.Entry<String, byte[]> entry: content.entrySet()) {
+            byte[] rawMime = entry.getValue();
+            byte[] attachmentContent = extractContent(rawMime);
+            channel.basicPublish(exchange, routingKey, new 
AMQP.BasicProperties(), attachmentContent);
+        }
+    }
+
+    private byte[] extractContent(byte[] rawMime) throws IOException {
+        try {
+            MimeBodyPart mimeBodyPart = new MimeBodyPart(new 
ByteArrayInputStream(rawMime));
+            return IOUtils.toByteArray(mimeBodyPart.getInputStream());
+        } catch (MessagingException e) {
+            throw new IOException(e);
+        }
+    }
+
+    public String getMailetInfo() {
+        return "AmqpForwardAttribute";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/13b00ee9/mailet/standard/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttributeTest.java
----------------------------------------------------------------------
diff --git 
a/mailet/standard/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttributeTest.java
 
b/mailet/standard/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttributeTest.java
new file mode 100644
index 0000000..5507b5e
--- /dev/null
+++ 
b/mailet/standard/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttributeTest.java
@@ -0,0 +1,237 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.transport.mailets;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+import javax.mail.MessagingException;
+
+import org.apache.mailet.Mail;
+import org.apache.mailet.MailetContext;
+import org.apache.mailet.MailetException;
+import org.apache.mailet.base.test.FakeMailContext;
+import org.apache.mailet.base.test.FakeMailetConfig;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.ArgumentCaptor;
+import org.slf4j.Logger;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.primitives.Bytes;
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.AMQP.BasicProperties;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+public class AmqpForwardAttributeTest {
+
+    private static final String MAIL_ATTRIBUTE = "ampq.attachments";
+    private static final String EXCHANGE_NAME = "exchangeName";
+    private static final String ROUTING_KEY = "routingKey";
+    private static final String AMQP_URI = "amqp://host";
+    private static final byte[] ATTACHMENT_HEADER = 
"Content-Transfer-Encoding: 8bit\r\nContent-Type: application/octet-stream; 
charset=utf-8\r\n\r\n".getBytes(Charsets.UTF_8);
+    private static final byte[] ATTACHMENT_CONTENT = "Attachment 
content".getBytes(Charsets.UTF_8);
+    private static final ImmutableMap<String, byte[]> ATTRIBUTE_CONTENT = 
ImmutableMap.of("attachment1.txt", Bytes.concat(ATTACHMENT_HEADER, 
ATTACHMENT_CONTENT));
+
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+
+    private AmqpForwardAttribute mailet;
+    private Logger logger;
+    private MailetContext mailetContext;
+    private FakeMailetConfig mailetConfig;
+
+    @Before
+    public void setUp() throws Exception {
+        mailet = new AmqpForwardAttribute();
+        logger = mock(Logger.class);
+        mailetContext = FakeMailContext.builder()
+                .logger(logger)
+                .build();
+        mailetConfig = FakeMailetConfig.builder()
+                .mailetName("Test")
+                .mailetContext(mailetContext)
+                .setProperty("uri", AMQP_URI)
+                .setProperty("exchange", EXCHANGE_NAME)
+                .setProperty("routing_key", ROUTING_KEY)
+                .setProperty("attribute", MAIL_ATTRIBUTE)
+                .build();
+    }
+
+    @Test
+    public void initShouldThrowWhenNoUriParameter() throws MessagingException {
+        FakeMailetConfig customMailetConfig = FakeMailetConfig.builder()
+                .mailetName("Test")
+                .mailetContext(mailetContext)
+                .build();
+        expectedException.expect(MailetException.class);
+        mailet.init(customMailetConfig);
+    }
+
+    @Test
+    public void initShouldThrowWhenNoExchangeParameter() throws 
MessagingException {
+        FakeMailetConfig customMailetConfig = FakeMailetConfig.builder()
+                .mailetName("Test")
+                .mailetContext(mailetContext)
+                .setProperty("uri", AMQP_URI)
+                .build();
+        expectedException.expect(MailetException.class);
+        mailet.init(customMailetConfig);
+    }
+
+    @Test
+    public void initShouldThrowWhenNoAttributeParameter() throws 
MessagingException {
+        FakeMailetConfig customMailetConfig = FakeMailetConfig.builder()
+                .mailetName("Test")
+                .mailetContext(mailetContext)
+                .setProperty("uri", AMQP_URI)
+                .setProperty("exchange", EXCHANGE_NAME)
+                .build();
+        expectedException.expect(MailetException.class);
+        mailet.init(customMailetConfig);
+    }
+
+    @Test
+    public void initShouldThrowWhenInvalidUri() throws MessagingException {
+        FakeMailetConfig customMailetConfig = FakeMailetConfig.builder()
+                .mailetName("Test")
+                .mailetContext(mailetContext)
+                .setProperty("uri", "bad-uri")
+                .setProperty("exchange", EXCHANGE_NAME)
+                .setProperty("attribute", MAIL_ATTRIBUTE)
+                .build();
+        expectedException.expect(MailetException.class);
+        mailet.init(customMailetConfig);
+    }
+
+    @Test
+    public void getMailetInfoShouldReturnInfo() {
+        assertThat(mailet.getMailetInfo()).isEqualTo("AmqpForwardAttribute");
+    }
+
+    @Test
+    public void 
initShouldIntializeEmptyRoutingKeyWhenAllParametersButRoutingKey() throws 
MessagingException {
+        FakeMailetConfig customMailetConfig = FakeMailetConfig.builder()
+                .mailetName("Test")
+                .mailetContext(mailetContext)
+                .setProperty("uri", AMQP_URI)
+                .setProperty("exchange", EXCHANGE_NAME)
+                .setProperty("attribute", MAIL_ATTRIBUTE)
+                .build();
+        mailet.init(customMailetConfig);
+
+        assertThat(mailet.routingKey).isEmpty();
+    }
+
+    @Test
+    public void initShouldNotThrowWithAllParameters() throws 
MessagingException {
+        mailet.init(mailetConfig);
+    }
+
+    @Test
+    public void serviceShouldNotUseConnectionWhenNoAttributeInMail() throws 
Exception {
+        mailet.init(mailetConfig);
+        Connection connection = mock(Connection.class);
+        ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
+        when(connectionFactory.newConnection()).thenReturn(connection);
+        mailet.setConnectionFactory(connectionFactory);
+        Mail mail = mock(Mail.class);
+        when(mail.getAttribute(MAIL_ATTRIBUTE)).thenReturn(null);
+
+        mailet.service(mail);
+
+        verifyZeroInteractions(connection);
+    }
+
+    @Test
+    public void serviceShouldThrowWhenAttributeContentIsNotAMap() throws 
MessagingException {
+        mailet.init(mailetConfig);
+        Mail mail = mock(Mail.class);
+        when(mail.getAttribute(MAIL_ATTRIBUTE)).thenReturn(ImmutableList.of());
+
+        expectedException.expect(MailetException.class);
+
+        mailet.service(mail);
+    }
+
+    @Test
+    public void serviceShouldLogWhenTimeoutException() throws Exception {
+        mailet.init(mailetConfig);
+        Mail mail = mock(Mail.class);
+        when(mail.getAttribute(MAIL_ATTRIBUTE)).thenReturn(ATTRIBUTE_CONTENT);
+        ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
+        TimeoutException expectedLoggedException = new TimeoutException();
+        
when(connectionFactory.newConnection()).thenThrow(expectedLoggedException);
+        mailet.setConnectionFactory(connectionFactory);
+
+        mailet.service(mail);
+
+        verify(logger).error(any(String.class), eq(expectedLoggedException));
+    }
+
+    @Test
+    public void serviceShouldLogWhenIOException() throws Exception {
+        mailet.init(mailetConfig);
+        Mail mail = mock(Mail.class);
+        when(mail.getAttribute(MAIL_ATTRIBUTE)).thenReturn(ATTRIBUTE_CONTENT);
+        ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
+        IOException expectedLoggedException = new IOException();
+        
when(connectionFactory.newConnection()).thenThrow(expectedLoggedException);
+        mailet.setConnectionFactory(connectionFactory);
+
+        mailet.service(mail);
+
+        verify(logger).error(any(String.class), eq(expectedLoggedException));
+    }
+
+    @Test
+    public void serviceShouldPublishAttributeContentWhenAttributeInMail() 
throws Exception {
+        mailet.init(mailetConfig);
+        Channel channel = mock(Channel.class);
+        Connection connection = mock(Connection.class);
+        when(connection.createChannel()).thenReturn(channel);
+        ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
+        when(connectionFactory.newConnection()).thenReturn(connection);
+        mailet.setConnectionFactory(connectionFactory);
+        Mail mail = mock(Mail.class);
+        when(mail.getAttribute(MAIL_ATTRIBUTE)).thenReturn(ATTRIBUTE_CONTENT);
+        BasicProperties expectedProperties = new AMQP.BasicProperties();
+
+        mailet.service(mail);
+
+        ArgumentCaptor<BasicProperties> basicPropertiesCaptor = 
ArgumentCaptor.forClass(BasicProperties.class);
+        verify(channel).basicPublish(eq(EXCHANGE_NAME), eq(ROUTING_KEY), 
basicPropertiesCaptor.capture(), eq(ATTACHMENT_CONTENT));
+        
assertThat(basicPropertiesCaptor.getValue()).isEqualToComparingFieldByField(expectedProperties);
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/13b00ee9/mailet/standard/src/test/java/org/apache/james/transport/mailets/LogMessageTest.java
----------------------------------------------------------------------
diff --git 
a/mailet/standard/src/test/java/org/apache/james/transport/mailets/LogMessageTest.java
 
b/mailet/standard/src/test/java/org/apache/james/transport/mailets/LogMessageTest.java
index b037d33..487bce7 100644
--- 
a/mailet/standard/src/test/java/org/apache/james/transport/mailets/LogMessageTest.java
+++ 
b/mailet/standard/src/test/java/org/apache/james/transport/mailets/LogMessageTest.java
@@ -20,6 +20,8 @@
 package org.apache.james.transport.mailets;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
@@ -98,8 +100,7 @@ public class LogMessageTest {
         verify(logger).info("Logging mail null");
         verify(logger).info("\n");
         verify(logger).info("Subject: subject\n");
-        verify(logger).error("Error logging message.");
-        verify(logger).error("No MimeMessage content");
+        verify(logger).error(eq("Error logging message."), 
any(MessagingException.class));
         verifyNoMoreInteractions(logger);
     }
 
@@ -139,8 +140,7 @@ public class LogMessageTest {
         mailet.service(mail);
 
         verify(logger).info("Logging mail name");
-        verify(logger).error("Error logging message.");
-        verify(logger).error("exception message");
+        verify(logger).error("Error logging message.", messagingException);
         verifyNoMoreInteractions(logger);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to