MAILET-143 Add AmqpForwardAttribute mailet which forward attribute values to RabbitMQ via AMQP prococol
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/13b00ee9 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/13b00ee9 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/13b00ee9 Branch: refs/heads/master Commit: 13b00ee9e1a07b2060b488e10701787ed7835f69 Parents: f779a3c Author: Raphael Ouazana <raphael.ouaz...@linagora.com> Authored: Thu Dec 15 17:04:56 2016 +0100 Committer: Raphael Ouazana <raphael.ouaz...@linagora.com> Committed: Wed Dec 21 18:31:07 2016 +0100 ---------------------------------------------------------------------- .../mailet/base/test/FakeMailContext.java | 20 +- mailet/pom.xml | 5 + mailet/standard/pom.xml | 4 + .../transport/mailets/AmqpForwardAttribute.java | 167 +++++++++++++ .../mailets/AmqpForwardAttributeTest.java | 237 +++++++++++++++++++ .../james/transport/mailets/LogMessageTest.java | 8 +- 6 files changed, 435 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/13b00ee9/mailet/base/src/test/java/org/apache/mailet/base/test/FakeMailContext.java ---------------------------------------------------------------------- diff --git a/mailet/base/src/test/java/org/apache/mailet/base/test/FakeMailContext.java b/mailet/base/src/test/java/org/apache/mailet/base/test/FakeMailContext.java index d6a4516..6296e94 100644 --- a/mailet/base/src/test/java/org/apache/mailet/base/test/FakeMailContext.java +++ b/mailet/base/src/test/java/org/apache/mailet/base/test/FakeMailContext.java @@ -344,8 +344,24 @@ public class FakeMailContext implements MailetContext { } public void log(LogLevel level, String message, Throwable t) { - log(level, message); - log(level, t.getMessage()); + if (logger.isPresent()) { + switch (level) { + case INFO: + logger.get().info(message, t); + break; + case WARN: + logger.get().warn(message, t); + break; + case ERROR: + logger.get().error(message, t); + break; + default: + logger.get().debug(message, t); + } + } else { + System.out.println("[" + level + "]" + message); + t.printStackTrace(System.out); + } } public List<String> dnsLookup(String name, RecordType type) throws LookupException { http://git-wip-us.apache.org/repos/asf/james-project/blob/13b00ee9/mailet/pom.xml ---------------------------------------------------------------------- diff --git a/mailet/pom.xml b/mailet/pom.xml index 368b27f..e2dd161 100644 --- a/mailet/pom.xml +++ b/mailet/pom.xml @@ -87,6 +87,11 @@ <version>${mime4j.version}</version> </dependency> <dependency> + <groupId>com.rabbitmq</groupId> + <artifactId>amqp-client</artifactId> + <version>4.0.0</version> + </dependency> + <dependency> <groupId>javax.mail</groupId> <artifactId>mail</artifactId> <version>${javax.version}</version> http://git-wip-us.apache.org/repos/asf/james-project/blob/13b00ee9/mailet/standard/pom.xml ---------------------------------------------------------------------- diff --git a/mailet/standard/pom.xml b/mailet/standard/pom.xml index 892ca71..9b99bc3 100644 --- a/mailet/standard/pom.xml +++ b/mailet/standard/pom.xml @@ -55,6 +55,10 @@ <artifactId>guava</artifactId> </dependency> <dependency> + <groupId>com.rabbitmq</groupId> + <artifactId>amqp-client</artifactId> + </dependency> + <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/james-project/blob/13b00ee9/mailet/standard/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java ---------------------------------------------------------------------- diff --git a/mailet/standard/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java b/mailet/standard/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java new file mode 100644 index 0000000..4596c7c --- /dev/null +++ b/mailet/standard/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java @@ -0,0 +1,167 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.transport.mailets; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +import javax.mail.MessagingException; +import javax.mail.internet.MimeBodyPart; + +import org.apache.commons.io.IOUtils; +import org.apache.mailet.Mail; +import org.apache.mailet.MailetException; +import org.apache.mailet.base.GenericMailet; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +/** + * This mailet forwards the attributes values to a AMPQ. + * <br /> + * It takes 4 parameters: + * <ul> + * <li>attribute (mandatory): content to be forwarded, expected to be a Map<String, byte[]> + * where the byte[] content is issued from a MimeBodyPart. + * It is typically generated from the StripAttachment mailet.</li> + * <li>uri (mandatory): AMQP URI defining the server where to send the attachment.</li> + * <li>exchange (mandatory): name of the AMQP exchange.</li> + * <li>routing_key (optional, default to empty string): name of the routing key on this exchange.</li> + * </ul> + * + * This mailet will extract the attachment content from the MimeBodyPart byte[] before + * sending it. + */ +public class AmqpForwardAttribute extends GenericMailet { + + public static final String URI_PARAMETER_NAME = "uri"; + public static final String EXCHANGE_PARAMETER_NAME = "exchange"; + public static final String ROUTING_KEY_PARAMETER_NAME = "routing_key"; + public static final String ATTRIBUTE_PARAMETER_NAME = "attribute"; + + public static final String ROUTING_KEY_DEFAULT_VALUE = ""; + + private String exchange; + private String attribute; + private ConnectionFactory connectionFactory; + @VisibleForTesting String routingKey; + + public void init() throws MailetException { + String uri = getInitParameter(URI_PARAMETER_NAME); + if (Strings.isNullOrEmpty(uri)) { + throw new MailetException("No value for " + URI_PARAMETER_NAME + + " parameter was provided."); + } + exchange = getInitParameter(EXCHANGE_PARAMETER_NAME); + if (Strings.isNullOrEmpty(exchange)) { + throw new MailetException("No value for " + EXCHANGE_PARAMETER_NAME + + " parameter was provided."); + } + routingKey = getInitParameter(ROUTING_KEY_PARAMETER_NAME, ROUTING_KEY_DEFAULT_VALUE); + attribute = getInitParameter(ATTRIBUTE_PARAMETER_NAME); + if (Strings.isNullOrEmpty(attribute)) { + throw new MailetException("No value for " + ATTRIBUTE_PARAMETER_NAME + + " parameter was provided."); + } + connectionFactory = new ConnectionFactory(); + try { + connectionFactory.setUri(uri); + } catch (Exception e) { + throw new MailetException("Invalid " + URI_PARAMETER_NAME + + " parameter was provided: " + uri, e); + } + } + + @VisibleForTesting void setConnectionFactory(ConnectionFactory connectionFactory) { + this.connectionFactory = connectionFactory; + } + + public void service(Mail mail) throws MailetException { + if (mail.getAttribute(attribute) == null) { + return; + } + Map<String, byte[]> content = getAttributeContent(mail); + try { + sendContent(content); + } catch (IOException e) { + log("IOException while writing to AMQP: " + e.getMessage(), e); + } catch (TimeoutException e) { + log("TimeoutException while writing to AMQP: " + e.getMessage(), e); + } + } + + @SuppressWarnings("unchecked") + private Map<String, byte[]> getAttributeContent(Mail mail) throws MailetException { + Serializable attributeContent = mail.getAttribute(attribute); + if (! (attributeContent instanceof Map)) { + throw new MailetException("Invalid attribute found into attribute " + + attribute + "class Map expected but " + + attributeContent.getClass() + " found."); + } + return (Map<String, byte[]>) attributeContent; + } + + private void sendContent(Map<String, byte[]> content) throws IOException, TimeoutException { + Connection connection = null; + Channel channel = null; + try { + connection = connectionFactory.newConnection(); + channel = connection.createChannel(); + channel.exchangeDeclarePassive(exchange); + sendContentOnChannel(channel, content); + } finally { + if (channel != null) { + channel.close(); + } + if (connection != null) { + connection.close(); + } + } + } + + private void sendContentOnChannel(Channel channel, Map<String, byte[]> content) throws IOException { + for (Map.Entry<String, byte[]> entry: content.entrySet()) { + byte[] rawMime = entry.getValue(); + byte[] attachmentContent = extractContent(rawMime); + channel.basicPublish(exchange, routingKey, new AMQP.BasicProperties(), attachmentContent); + } + } + + private byte[] extractContent(byte[] rawMime) throws IOException { + try { + MimeBodyPart mimeBodyPart = new MimeBodyPart(new ByteArrayInputStream(rawMime)); + return IOUtils.toByteArray(mimeBodyPart.getInputStream()); + } catch (MessagingException e) { + throw new IOException(e); + } + } + + public String getMailetInfo() { + return "AmqpForwardAttribute"; + } + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/13b00ee9/mailet/standard/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttributeTest.java ---------------------------------------------------------------------- diff --git a/mailet/standard/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttributeTest.java b/mailet/standard/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttributeTest.java new file mode 100644 index 0000000..5507b5e --- /dev/null +++ b/mailet/standard/src/test/java/org/apache/james/transport/mailets/AmqpForwardAttributeTest.java @@ -0,0 +1,237 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.transport.mailets; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import javax.mail.MessagingException; + +import org.apache.mailet.Mail; +import org.apache.mailet.MailetContext; +import org.apache.mailet.MailetException; +import org.apache.mailet.base.test.FakeMailContext; +import org.apache.mailet.base.test.FakeMailetConfig; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.ArgumentCaptor; +import org.slf4j.Logger; + +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.primitives.Bytes; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +public class AmqpForwardAttributeTest { + + private static final String MAIL_ATTRIBUTE = "ampq.attachments"; + private static final String EXCHANGE_NAME = "exchangeName"; + private static final String ROUTING_KEY = "routingKey"; + private static final String AMQP_URI = "amqp://host"; + private static final byte[] ATTACHMENT_HEADER = "Content-Transfer-Encoding: 8bit\r\nContent-Type: application/octet-stream; charset=utf-8\r\n\r\n".getBytes(Charsets.UTF_8); + private static final byte[] ATTACHMENT_CONTENT = "Attachment content".getBytes(Charsets.UTF_8); + private static final ImmutableMap<String, byte[]> ATTRIBUTE_CONTENT = ImmutableMap.of("attachment1.txt", Bytes.concat(ATTACHMENT_HEADER, ATTACHMENT_CONTENT)); + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private AmqpForwardAttribute mailet; + private Logger logger; + private MailetContext mailetContext; + private FakeMailetConfig mailetConfig; + + @Before + public void setUp() throws Exception { + mailet = new AmqpForwardAttribute(); + logger = mock(Logger.class); + mailetContext = FakeMailContext.builder() + .logger(logger) + .build(); + mailetConfig = FakeMailetConfig.builder() + .mailetName("Test") + .mailetContext(mailetContext) + .setProperty("uri", AMQP_URI) + .setProperty("exchange", EXCHANGE_NAME) + .setProperty("routing_key", ROUTING_KEY) + .setProperty("attribute", MAIL_ATTRIBUTE) + .build(); + } + + @Test + public void initShouldThrowWhenNoUriParameter() throws MessagingException { + FakeMailetConfig customMailetConfig = FakeMailetConfig.builder() + .mailetName("Test") + .mailetContext(mailetContext) + .build(); + expectedException.expect(MailetException.class); + mailet.init(customMailetConfig); + } + + @Test + public void initShouldThrowWhenNoExchangeParameter() throws MessagingException { + FakeMailetConfig customMailetConfig = FakeMailetConfig.builder() + .mailetName("Test") + .mailetContext(mailetContext) + .setProperty("uri", AMQP_URI) + .build(); + expectedException.expect(MailetException.class); + mailet.init(customMailetConfig); + } + + @Test + public void initShouldThrowWhenNoAttributeParameter() throws MessagingException { + FakeMailetConfig customMailetConfig = FakeMailetConfig.builder() + .mailetName("Test") + .mailetContext(mailetContext) + .setProperty("uri", AMQP_URI) + .setProperty("exchange", EXCHANGE_NAME) + .build(); + expectedException.expect(MailetException.class); + mailet.init(customMailetConfig); + } + + @Test + public void initShouldThrowWhenInvalidUri() throws MessagingException { + FakeMailetConfig customMailetConfig = FakeMailetConfig.builder() + .mailetName("Test") + .mailetContext(mailetContext) + .setProperty("uri", "bad-uri") + .setProperty("exchange", EXCHANGE_NAME) + .setProperty("attribute", MAIL_ATTRIBUTE) + .build(); + expectedException.expect(MailetException.class); + mailet.init(customMailetConfig); + } + + @Test + public void getMailetInfoShouldReturnInfo() { + assertThat(mailet.getMailetInfo()).isEqualTo("AmqpForwardAttribute"); + } + + @Test + public void initShouldIntializeEmptyRoutingKeyWhenAllParametersButRoutingKey() throws MessagingException { + FakeMailetConfig customMailetConfig = FakeMailetConfig.builder() + .mailetName("Test") + .mailetContext(mailetContext) + .setProperty("uri", AMQP_URI) + .setProperty("exchange", EXCHANGE_NAME) + .setProperty("attribute", MAIL_ATTRIBUTE) + .build(); + mailet.init(customMailetConfig); + + assertThat(mailet.routingKey).isEmpty(); + } + + @Test + public void initShouldNotThrowWithAllParameters() throws MessagingException { + mailet.init(mailetConfig); + } + + @Test + public void serviceShouldNotUseConnectionWhenNoAttributeInMail() throws Exception { + mailet.init(mailetConfig); + Connection connection = mock(Connection.class); + ConnectionFactory connectionFactory = mock(ConnectionFactory.class); + when(connectionFactory.newConnection()).thenReturn(connection); + mailet.setConnectionFactory(connectionFactory); + Mail mail = mock(Mail.class); + when(mail.getAttribute(MAIL_ATTRIBUTE)).thenReturn(null); + + mailet.service(mail); + + verifyZeroInteractions(connection); + } + + @Test + public void serviceShouldThrowWhenAttributeContentIsNotAMap() throws MessagingException { + mailet.init(mailetConfig); + Mail mail = mock(Mail.class); + when(mail.getAttribute(MAIL_ATTRIBUTE)).thenReturn(ImmutableList.of()); + + expectedException.expect(MailetException.class); + + mailet.service(mail); + } + + @Test + public void serviceShouldLogWhenTimeoutException() throws Exception { + mailet.init(mailetConfig); + Mail mail = mock(Mail.class); + when(mail.getAttribute(MAIL_ATTRIBUTE)).thenReturn(ATTRIBUTE_CONTENT); + ConnectionFactory connectionFactory = mock(ConnectionFactory.class); + TimeoutException expectedLoggedException = new TimeoutException(); + when(connectionFactory.newConnection()).thenThrow(expectedLoggedException); + mailet.setConnectionFactory(connectionFactory); + + mailet.service(mail); + + verify(logger).error(any(String.class), eq(expectedLoggedException)); + } + + @Test + public void serviceShouldLogWhenIOException() throws Exception { + mailet.init(mailetConfig); + Mail mail = mock(Mail.class); + when(mail.getAttribute(MAIL_ATTRIBUTE)).thenReturn(ATTRIBUTE_CONTENT); + ConnectionFactory connectionFactory = mock(ConnectionFactory.class); + IOException expectedLoggedException = new IOException(); + when(connectionFactory.newConnection()).thenThrow(expectedLoggedException); + mailet.setConnectionFactory(connectionFactory); + + mailet.service(mail); + + verify(logger).error(any(String.class), eq(expectedLoggedException)); + } + + @Test + public void serviceShouldPublishAttributeContentWhenAttributeInMail() throws Exception { + mailet.init(mailetConfig); + Channel channel = mock(Channel.class); + Connection connection = mock(Connection.class); + when(connection.createChannel()).thenReturn(channel); + ConnectionFactory connectionFactory = mock(ConnectionFactory.class); + when(connectionFactory.newConnection()).thenReturn(connection); + mailet.setConnectionFactory(connectionFactory); + Mail mail = mock(Mail.class); + when(mail.getAttribute(MAIL_ATTRIBUTE)).thenReturn(ATTRIBUTE_CONTENT); + BasicProperties expectedProperties = new AMQP.BasicProperties(); + + mailet.service(mail); + + ArgumentCaptor<BasicProperties> basicPropertiesCaptor = ArgumentCaptor.forClass(BasicProperties.class); + verify(channel).basicPublish(eq(EXCHANGE_NAME), eq(ROUTING_KEY), basicPropertiesCaptor.capture(), eq(ATTACHMENT_CONTENT)); + assertThat(basicPropertiesCaptor.getValue()).isEqualToComparingFieldByField(expectedProperties); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/13b00ee9/mailet/standard/src/test/java/org/apache/james/transport/mailets/LogMessageTest.java ---------------------------------------------------------------------- diff --git a/mailet/standard/src/test/java/org/apache/james/transport/mailets/LogMessageTest.java b/mailet/standard/src/test/java/org/apache/james/transport/mailets/LogMessageTest.java index b037d33..487bce7 100644 --- a/mailet/standard/src/test/java/org/apache/james/transport/mailets/LogMessageTest.java +++ b/mailet/standard/src/test/java/org/apache/james/transport/mailets/LogMessageTest.java @@ -20,6 +20,8 @@ package org.apache.james.transport.mailets; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -98,8 +100,7 @@ public class LogMessageTest { verify(logger).info("Logging mail null"); verify(logger).info("\n"); verify(logger).info("Subject: subject\n"); - verify(logger).error("Error logging message."); - verify(logger).error("No MimeMessage content"); + verify(logger).error(eq("Error logging message."), any(MessagingException.class)); verifyNoMoreInteractions(logger); } @@ -139,8 +140,7 @@ public class LogMessageTest { mailet.service(mail); verify(logger).info("Logging mail name"); - verify(logger).error("Error logging message."); - verify(logger).error("exception message"); + verify(logger).error("Error logging message.", messagingException); verifyNoMoreInteractions(logger); } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org