JAMES-2167 Serializable attributes are not preserved by enqueue/dequeue on a JMS queue.
The non String email attributes now serialized/deserialized properly. Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/46ea987a Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/46ea987a Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/46ea987a Branch: refs/heads/master Commit: 46ea987a02583ae19ccac6c9f1c7952e343a0d74 Parents: b4065d5 Author: Edgar Asatryan <nst...@gmail.com> Authored: Sat Jul 7 00:03:44 2018 +0400 Committer: benwa <btell...@linagora.com> Committed: Tue Jul 17 09:35:56 2018 +0700 ---------------------------------------------------------------------- .../james/queue/api/MailQueueContract.java | 46 ++++++++ .../apache/james/queue/jms/JMSMailQueue.java | 79 ++++++-------- .../james/queue/jms/JMSSerializationUtils.java | 66 ++++++++++++ .../james/queue/jms/JMSMailQueueTest.java | 27 +---- .../queue/jms/JMSSerializationUtilsTest.java | 107 +++++++++++++++++++ 5 files changed, 252 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/46ea987a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java index 975d8ed..205c58b 100644 --- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java +++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java @@ -28,7 +28,9 @@ import static org.apache.mailet.base.MailAddressFixture.SENDER; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import java.io.Serializable; import java.util.Date; +import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -44,6 +46,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import com.github.fge.lambdas.Throwing; +import com.google.common.base.MoreObjects; import com.google.common.base.Strings; @ExtendWith(ExecutorExtension.class) @@ -203,6 +206,20 @@ public interface MailQueueContract { } @Test + default void queueShouldPreserveNonStringMailAttribute() throws Exception { + String attributeName = "any"; + SerializableAttribute attributeValue = new SerializableAttribute("value"); + getMailQueue().enQueue(defaultMail() + .attribute(attributeName, attributeValue) + .build()); + + MailQueue.MailQueueItem mailQueueItem = getMailQueue().deQueue(); + assertThat(mailQueueItem.getMail().getAttribute(attributeName)) + .isInstanceOf(SerializableAttribute.class) + .isEqualTo(attributeValue); + } + + @Test default void dequeueShouldBeFifo() throws Exception { String firstExpectedName = "name1"; getMailQueue().enQueue(defaultMail() @@ -305,4 +322,33 @@ public interface MailQueueContract { assertThat(tryDequeue.get().getMail().getName()).isEqualTo("name"); } + class SerializableAttribute implements Serializable { + private final String value; + + public SerializableAttribute(String value) { + this.value = value; + } + + @Override + public final boolean equals(Object o) { + if (o instanceof SerializableAttribute) { + SerializableAttribute that = (SerializableAttribute) o; + + return Objects.equals(this.value, that.value); + } + return false; + } + + @Override + public final int hashCode() { + return Objects.hash(value); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("value", value) + .toString(); + } + } } http://git-wip-us.apache.org/repos/asf/james-project/blob/46ea987a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java index 08e9e11..f31a23b 100644 --- a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java +++ b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java @@ -20,7 +20,6 @@ package org.apache.james.queue.jms; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.Serializable; import java.time.Instant; import java.time.ZoneId; import java.time.ZonedDateTime; @@ -28,7 +27,6 @@ import java.util.ArrayList; import java.util.Date; import java.util.Enumeration; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -51,9 +49,7 @@ import javax.mail.MessagingException; import javax.mail.internet.AddressException; import javax.mail.internet.MimeMessage; -import org.apache.commons.codec.binary.Base64; import org.apache.commons.collections.iterators.EnumerationIterator; -import org.apache.commons.lang3.SerializationUtils; import org.apache.james.core.MailAddress; import org.apache.james.lifecycle.api.Disposable; import org.apache.james.metrics.api.Metric; @@ -73,6 +69,7 @@ import org.threeten.extra.Temporals; import com.github.fge.lambdas.Throwing; import com.google.common.base.Joiner; +import com.google.common.base.Splitter; import com.google.common.collect.Iterators; /** @@ -153,6 +150,9 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori protected final Queue queue; protected final MessageProducer producer; + private final Joiner joiner; + private final Splitter splitter; + public JMSMailQueue(ConnectionFactory connectionFactory, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, String queueName, MetricFactory metricFactory) { try { connection = connectionFactory.createConnection(); @@ -166,6 +166,10 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori this.enqueuedMailsMetric = metricFactory.generate("enqueuedMail:" + queueName); this.mailQueueSize = metricFactory.generate("mailQueueSize:" + queueName); + this.joiner = Joiner.on(JAMES_MAIL_SEPARATOR).skipNulls(); + this.splitter = Splitter.on(JAMES_MAIL_SEPARATOR) + .omitEmptyStrings() // ignore null values. See JAMES-1294 + .trimResults(); try { session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); queue = session.createQueue(queueName); @@ -313,11 +317,10 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori // won't serialize the empty headers so it is mandatory // to handle nulls when reconstructing mail from message if (!mail.getPerRecipientSpecificHeaders().getHeadersByRecipient().isEmpty()) { - byte[] serialize = SerializationUtils.serialize(mail.getPerRecipientSpecificHeaders()); - props.put(JAMES_MAIL_PER_RECIPIENT_HEADERS, Base64.encodeBase64String(serialize)); + props.put(JAMES_MAIL_PER_RECIPIENT_HEADERS, JMSSerializationUtils.serialize(mail.getPerRecipientSpecificHeaders())); } - String recipientsAsString = Joiner.on(JAMES_MAIL_SEPARATOR).skipNulls().join(mail.getRecipients()); + String recipientsAsString = joiner.join(mail.getRecipients()); props.put(JAMES_MAIL_RECIPIENTS, recipientsAsString); props.put(JAMES_MAIL_REMOTEADDR, mail.getRemoteAddr()); @@ -325,22 +328,13 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori String sender = Optional.ofNullable(mail.getSender()).map(MailAddress::asString).orElse(""); - StringBuilder attrsBuilder = new StringBuilder(); - Iterator<String> attrs = mail.getAttributeNames(); - while (attrs.hasNext()) { - String attrName = attrs.next(); - attrsBuilder.append(attrName); - - Object value = convertAttributeValue(mail.getAttribute(attrName)); - props.put(attrName, value); + org.apache.james.util.streams.Iterators.toStream(mail.getAttributeNames()) + .forEach(attrName -> props.put(attrName, JMSSerializationUtils.serialize(mail.getAttribute(attrName)))); - if (attrs.hasNext()) { - attrsBuilder.append(JAMES_MAIL_SEPARATOR); - } - } - props.put(JAMES_MAIL_ATTRIBUTE_NAMES, attrsBuilder.toString()); + props.put(JAMES_MAIL_ATTRIBUTE_NAMES, joiner.join(mail.getAttributeNames())); props.put(JAMES_MAIL_SENDER, sender); props.put(JAMES_MAIL_STATE, mail.getState()); + return props; } @@ -392,10 +386,7 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori mail.setLastUpdated(new Date(message.getLongProperty(JAMES_MAIL_LAST_UPDATED))); mail.setName(message.getStringProperty(JAMES_MAIL_NAME)); - Optional.ofNullable(message.getStringProperty(JAMES_MAIL_PER_RECIPIENT_HEADERS)) - .map(String::getBytes) - .map(Throwing.function(Base64::decodeBase64)) - .<PerRecipientHeaders>map(SerializationUtils::deserialize) + Optional.ofNullable(JMSSerializationUtils.<PerRecipientHeaders>deserialize(message.getStringProperty(JAMES_MAIL_PER_RECIPIENT_HEADERS))) .ifPresent(mail::addAllSpecificHeaderForRecipient); List<MailAddress> rcpts = new ArrayList<>(); @@ -417,23 +408,9 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori mail.setRemoteHost(message.getStringProperty(JAMES_MAIL_REMOTEHOST)); String attributeNames = message.getStringProperty(JAMES_MAIL_ATTRIBUTE_NAMES); - StringTokenizer namesTokenizer = new StringTokenizer(attributeNames, JAMES_MAIL_SEPARATOR); - while (namesTokenizer.hasMoreTokens()) { - String name = namesTokenizer.nextToken(); - - // Now cast the property back to Serializable and set it as attribute. - // See JAMES-1241 - Object attrValue = message.getObjectProperty(name); - - // ignore null values. See JAMES-1294 - if (attrValue != null) { - if (attrValue instanceof Serializable) { - mail.setAttribute(name, (Serializable) attrValue); - } else { - LOGGER.error("Not supported mail attribute {} of type {} for mail {}", name, attrValue, mail.getName()); - } - } - } + + splitter.split(attributeNames) + .forEach(name -> setMailAttribute(message, mail, name)); String sender = message.getStringProperty(JAMES_MAIL_SENDER); if (sender == null || sender.trim().length() <= 0) { @@ -454,16 +431,22 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori } /** - * Convert the attribute value if necessary. + * Retrieves the attribute by {@code name} form {@code message} and tries to add it on {@code mail}. * - * @param value - * @return convertedValue + * @param message The attribute source. + * @param mail The mail on which attribute should be set. + * @param name The attribute name. */ - protected Object convertAttributeValue(Object value) { - if (value == null || value instanceof String || value instanceof Byte || value instanceof Long || value instanceof Double || value instanceof Boolean || value instanceof Integer || value instanceof Short || value instanceof Float) { - return value; + private void setMailAttribute(Message message, Mail mail, String name) { + // Now cast the property back to Serializable and set it as attribute. + // See JAMES-1241 + Object attrValue = Throwing.function(message::getObjectProperty).apply(name); + + if (attrValue instanceof String) { + mail.setAttribute(name, JMSSerializationUtils.deserialize((String) attrValue)); + } else { + LOGGER.error("Not supported mail attribute {} of type {} for mail {}", name, attrValue, mail.getName()); } - return value.toString(); } @Override http://git-wip-us.apache.org/repos/asf/james-project/blob/46ea987a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSSerializationUtils.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSSerializationUtils.java b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSSerializationUtils.java new file mode 100644 index 0000000..98a0ac7 --- /dev/null +++ b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSSerializationUtils.java @@ -0,0 +1,66 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.queue.jms; + +import java.io.Serializable; +import java.util.Optional; + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang3.SerializationUtils; + +import com.github.fge.lambdas.Throwing; + +/** + * This class is similar to {@link SerializationUtils}. Unlike {@link SerializationUtils} this class operates with + * {@code String}s and not byte arrays. + * <p> + * The main advantage of this utility is that it introduces an additional operation after serialization and before + * deserialization. The operation consists in encoding/decoding the serialized/deserialized data in Base64, so that data + * can be safely transmitted over the wire. + */ +public class JMSSerializationUtils { + /** + * Serialize the input object using standard mechanisms then encodes result using base64 encoding. + * + * @param obj The object that needs to be serialized. + * + * @return The base64 representation of {@code obj}. + */ + public static String serialize(Serializable obj) { + return Optional.ofNullable(obj) + .map(SerializationUtils::serialize) + .map(Base64::encodeBase64String) + .orElse(null); + } + + /** + * Decodes the input base64 string and deserialize it. + * + * @param <T> The resulting type after deserialization. + * @param object The base64 encoded string. + * + * @return The deserialized object. + */ + public static <T extends Serializable> T deserialize(String object) { + return Optional.ofNullable(object) + .map(Throwing.function(Base64::decodeBase64)) + .<T>map(SerializationUtils::deserialize) + .orElse(null); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/46ea987a/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java b/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java index aa6a6e4..826f42b 100644 --- a/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java +++ b/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java @@ -19,8 +19,6 @@ package org.apache.james.queue.jms; -import java.util.concurrent.ExecutorService; - import javax.jms.ConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory; @@ -44,7 +42,7 @@ public class JMSMailQueueTest implements DelayedManageableMailQueueContract, Pri private JMSMailQueue mailQueue; @BeforeEach - public void setUp(BrokerService broker) throws Exception { + void setUp(BrokerService broker) { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false"); RawMailQueueItemDecoratorFactory mailQueueItemDecoratorFactory = new RawMailQueueItemDecoratorFactory(); NoopMetricFactory metricFactory = new NoopMetricFactory(); @@ -53,7 +51,7 @@ public class JMSMailQueueTest implements DelayedManageableMailQueueContract, Pri } @AfterEach - public void tearDown() throws Exception { + void tearDown() { mailQueue.dispose(); } @@ -83,20 +81,6 @@ public class JMSMailQueueTest implements DelayedManageableMailQueueContract, Pri @Test @Override - @Disabled("JAMES-2301 Per recipients headers are not attached to the message.") - public void queueShouldPreservePerRecipientHeaders() { - - } - - @Test - @Override - @Disabled("JAMES-2296 Not handled by JMS mailqueue. Only single recipient per-recipient removal works") - public void removeByRecipientShouldRemoveSpecificEmailWhenMultipleRecipients() { - - } - - @Test - @Override @Disabled("JAMES-2308 Flushing JMS mail queue randomly re-order them" + "Random test failing around 1% of the time") public void flushShouldPreserveBrowseOrder() { @@ -105,13 +89,6 @@ public class JMSMailQueueTest implements DelayedManageableMailQueueContract, Pri @Test @Override - @Disabled("JAMES-2309 Long overflow in JMS delays") - public void enqueueWithVeryLongDelayShouldDelayMail(ExecutorService executorService) { - - } - - @Test - @Override @Disabled("JAMES-2312 JMS clear mailqueue can ommit some messages" + "Random test failing around 1% of the time") public void clearShouldRemoveAllElements() { http://git-wip-us.apache.org/repos/asf/james-project/blob/46ea987a/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSSerializationUtilsTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSSerializationUtilsTest.java b/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSSerializationUtilsTest.java new file mode 100644 index 0000000..6ab010f --- /dev/null +++ b/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSSerializationUtilsTest.java @@ -0,0 +1,107 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.queue.jms; + +import static org.apache.james.queue.jms.JMSSerializationUtils.deserialize; +import static org.apache.james.queue.jms.JMSSerializationUtils.serialize; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; + +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.Objects; +import java.util.Optional; + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang3.SerializationException; +import org.junit.jupiter.api.Test; + +class JMSSerializationUtilsTest { + /** + * Serializes and deserializes the provided object. + * + * @param obj The object that needs to be serialized. + * @param <T> The type of the provided object. + * + * @return The provided object. + */ + private static <T extends Serializable> T roundtrip(T obj) { + return Optional.ofNullable(obj) + .map(JMSSerializationUtils::serialize) + .<T>map(JMSSerializationUtils::deserialize) + .orElseThrow(() -> new IllegalArgumentException("Cannot serialize/deserialize: " + obj)); + } + + @Test + void trySerializeShouldReturnString() { + SerializableStringHolder value = new SerializableStringHolder("value"); + + String serializedIntegerString = "rO0ABXNyAE1vcmcuYXBhY2hlLmphbWVzLnF1ZXVlLmptcy5KTVNTZXJpYWxp" + + "emF0aW9uVXRpbHNUZXN0JFNlcmlhbGl6YWJsZVN0cmluZ0hvbGRlcsy4/DEA" + + "8nRZAgABTAAFdmFsdWV0ABJMamF2YS9sYW5nL1N0cmluZzt4cHQABXZhbHVl"; + + String actual = serialize(value); + + assertThat(actual).isEqualTo(serializedIntegerString); + } + + @Test + void roundTripShouldReturnEqualObject() { + SerializableStringHolder expected = new SerializableStringHolder("value"); + + assertThat(roundtrip(expected)).isEqualTo(expected); + } + + @Test + void deserializeShouldThrowWhenNotBase64StringProvided() { + assertThatExceptionOfType(SerializationException.class) + .isThrownBy(() -> deserialize("abc")); + } + + @Test + void deserializeShouldThrowWhenNotSerializedBytesAreEncodedInBase64() { + assertThatExceptionOfType(SerializationException.class) + .isThrownBy(() -> deserialize(Base64.encodeBase64String("abc".getBytes(StandardCharsets.UTF_8)))); + } + + private static class SerializableStringHolder implements Serializable { + private final String value; + + SerializableStringHolder(String value) { + this.value = value; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof SerializableStringHolder)) { + return false; + } + SerializableStringHolder that = (SerializableStringHolder) o; + return Objects.equals(value, that.value); + } + + @Override + public int hashCode() { + return Objects.hash(value); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org