Repository: james-project Updated Branches: refs/heads/master 6e8d222ac -> 4ac0a34af
JAMES-2540 Split RabbitMQ Mail Queue class in smaller chuncks Have a package-visible class for each mail queue operation to help readability Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/994f5b1f Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/994f5b1f Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/994f5b1f Branch: refs/heads/master Commit: 994f5b1f27367f938156653f0d9c19e55532c189 Parents: fc03482 Author: Benoit Tellier <btell...@linagora.com> Authored: Tue Sep 11 10:07:52 2018 +0700 Committer: Benoit Tellier <btell...@linagora.com> Committed: Fri Sep 14 11:05:42 2018 +0700 ---------------------------------------------------------------------- .../apache/james/queue/rabbitmq/Dequeuer.java | 182 ++++++++++++++++++ .../apache/james/queue/rabbitmq/Enqueuer.java | 79 ++++++++ .../james/queue/rabbitmq/RabbitMQMailQueue.java | 189 ++----------------- 3 files changed, 278 insertions(+), 172 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/994f5b1f/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java new file mode 100644 index 0000000..f5b690d --- /dev/null +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java @@ -0,0 +1,182 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.queue.rabbitmq; + +import static org.apache.james.queue.api.MailQueue.DEQUEUED_METRIC_NAME_PREFIX; + +import java.io.IOException; +import java.io.Serializable; +import java.time.Instant; +import java.util.Date; +import java.util.concurrent.Executors; + +import javax.mail.MessagingException; +import javax.mail.internet.AddressException; +import javax.mail.internet.MimeMessage; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.Store; +import org.apache.james.blob.mail.MimeMessagePartsId; +import org.apache.james.core.MailAddress; +import org.apache.james.metrics.api.Metric; +import org.apache.james.metrics.api.MetricFactory; +import org.apache.james.queue.api.MailQueue; +import org.apache.james.server.core.MailImpl; +import org.apache.james.util.SerializationUtil; +import org.apache.mailet.Mail; +import org.apache.mailet.PerRecipientHeaders; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.fge.lambdas.Throwing; +import com.github.steveash.guavate.Guavate; +import com.nurkiewicz.asyncretry.AsyncRetryExecutor; +import com.rabbitmq.client.GetResponse; + +class Dequeuer { + private static class NoMailYetException extends RuntimeException { + } + + private static class RabbitMQMailQueueItem implements MailQueue.MailQueueItem { + private final RabbitClient rabbitClient; + private final long deliveryTag; + private final Mail mail; + + private RabbitMQMailQueueItem(RabbitClient rabbitClient, long deliveryTag, Mail mail) { + this.rabbitClient = rabbitClient; + this.deliveryTag = deliveryTag; + this.mail = mail; + } + + @Override + public Mail getMail() { + return mail; + } + + @Override + public void done(boolean success) throws MailQueue.MailQueueException { + try { + rabbitClient.ack(deliveryTag); + } catch (IOException e) { + throw new MailQueue.MailQueueException("Failed to ACK " + mail.getName() + " with delivery tag " + deliveryTag, e); + } + } + } + + private static final int TEN_MS = 10; + + private final MailQueueName name; + private final RabbitClient rabbitClient; + private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore; + private final BlobId.Factory blobIdFactory; + private final ObjectMapper objectMapper; + private final Metric dequeueMetric; + + Dequeuer(MailQueueName name, RabbitClient rabbitClient, Store<MimeMessage, MimeMessagePartsId> mimeMessageStore, BlobId.Factory blobIdFactory, ObjectMapper objectMapper, MetricFactory metricFactory) { + this.name = name; + this.rabbitClient = rabbitClient; + this.mimeMessageStore = mimeMessageStore; + this.blobIdFactory = blobIdFactory; + this.objectMapper = objectMapper; + this.dequeueMetric = metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + name.asString()); + } + + MailQueue.MailQueueItem deQueue() throws MailQueue.MailQueueException { + GetResponse getResponse = pollChannel(); + MailDTO mailDTO = toDTO(getResponse); + Mail mail = toMail(mailDTO); + dequeueMetric.increment(); + return new RabbitMQMailQueueItem(rabbitClient, getResponse.getEnvelope().getDeliveryTag(), mail); + } + + private MailDTO toDTO(GetResponse getResponse) throws MailQueue.MailQueueException { + try { + return objectMapper.readValue(getResponse.getBody(), MailDTO.class); + } catch (IOException e) { + throw new MailQueue.MailQueueException("Failed to parse DTO", e); + } + } + + private GetResponse pollChannel() { + return new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor()) + .withFixedRate() + .withMinDelay(TEN_MS) + .retryOn(NoMailYetException.class) + .getWithRetry(this::singleChannelRead) + .join(); + } + + private GetResponse singleChannelRead() throws IOException { + return rabbitClient.poll(name) + .filter(getResponse -> getResponse.getBody() != null) + .orElseThrow(NoMailYetException::new); + } + + private Mail toMail(MailDTO dto) throws MailQueue.MailQueueException { + try { + MimeMessage mimeMessage = mimeMessageStore.read( + MimeMessagePartsId.builder() + .headerBlobId(blobIdFactory.from(dto.getHeaderBlobId())) + .bodyBlobId(blobIdFactory.from(dto.getBodyBlobId())) + .build()) + .join(); + + MailImpl mail = new MailImpl( + dto.getName(), + dto.getSender().map(MailAddress::getMailSender).orElse(null), + dto.getRecipients() + .stream() + .map(Throwing.<String, MailAddress>function(MailAddress::new).sneakyThrow()) + .collect(Guavate.toImmutableList()), + mimeMessage); + + mail.setErrorMessage(dto.getErrorMessage()); + mail.setRemoteAddr(dto.getRemoteAddr()); + mail.setRemoteHost(dto.getRemoteHost()); + mail.setState(dto.getState()); + dto.getLastUpdated() + .map(Instant::toEpochMilli) + .map(Date::new) + .ifPresent(mail::setLastUpdated); + + dto.getAttributes() + .forEach((name, value) -> mail.setAttribute(name, SerializationUtil.<Serializable>deserialize(value))); + + mail.addAllSpecificHeaderForRecipient(retrievePerRecipientHeaders(dto)); + + return mail; + } catch (AddressException e) { + throw new MailQueue.MailQueueException("Failed to parse mail address", e); + } catch (MessagingException e) { + throw new MailQueue.MailQueueException("Failed to generate mime message", e); + } + } + + private PerRecipientHeaders retrievePerRecipientHeaders(MailDTO dto) { + PerRecipientHeaders perRecipientHeaders = new PerRecipientHeaders(); + dto.getPerRecipientHeaders() + .entrySet() + .stream() + .flatMap(entry -> entry.getValue().toHeaders().stream() + .map(Throwing.function(header -> Pair.of(new MailAddress(entry.getKey()), header)))) + .forEach(pair -> perRecipientHeaders.addHeaderForRecipient(pair.getValue(), pair.getKey())); + return perRecipientHeaders; + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/994f5b1f/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java new file mode 100644 index 0000000..ba655f8 --- /dev/null +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java @@ -0,0 +1,79 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.queue.rabbitmq; + +import static org.apache.james.queue.api.MailQueue.ENQUEUED_METRIC_NAME_PREFIX; + +import java.util.concurrent.CompletableFuture; + +import javax.mail.MessagingException; +import javax.mail.internet.MimeMessage; + +import org.apache.james.blob.api.Store; +import org.apache.james.blob.mail.MimeMessagePartsId; +import org.apache.james.metrics.api.Metric; +import org.apache.james.metrics.api.MetricFactory; +import org.apache.james.queue.api.MailQueue; +import org.apache.mailet.Mail; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +class Enqueuer { + private final MailQueueName name; + private final RabbitClient rabbitClient; + private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore; + private final ObjectMapper objectMapper; + private final Metric enqueueMetric; + + Enqueuer(MailQueueName name, RabbitClient rabbitClient, Store<MimeMessage, MimeMessagePartsId> mimeMessageStore, + ObjectMapper objectMapper, MetricFactory metricFactory) { + this.name = name; + this.rabbitClient = rabbitClient; + this.mimeMessageStore = mimeMessageStore; + this.objectMapper = objectMapper; + this.enqueueMetric = metricFactory.generate(ENQUEUED_METRIC_NAME_PREFIX + name.asString()); + } + + void enQueue(Mail mail) throws MailQueue.MailQueueException { + MimeMessagePartsId partsId = saveBlobs(mail).join(); + MailDTO mailDTO = MailDTO.fromMail(mail, partsId); + byte[] message = getMessageBytes(mailDTO); + rabbitClient.publish(name, message); + + enqueueMetric.increment(); + } + + private CompletableFuture<MimeMessagePartsId> saveBlobs(Mail mail) throws MailQueue.MailQueueException { + try { + return mimeMessageStore.save(mail.getMessage()); + } catch (MessagingException e) { + throw new MailQueue.MailQueueException("Error while saving blob", e); + } + } + + private byte[] getMessageBytes(MailDTO mailDTO) throws MailQueue.MailQueueException { + try { + return objectMapper.writeValueAsBytes(mailDTO); + } catch (JsonProcessingException e) { + throw new MailQueue.MailQueueException("Unable to serialize message", e); + } + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/994f5b1f/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java index 8f9b68e..74f6ef7 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java @@ -19,85 +19,39 @@ package org.apache.james.queue.rabbitmq; -import java.io.IOException; -import java.io.Serializable; -import java.time.Instant; -import java.util.Date; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import javax.inject.Inject; -import javax.mail.MessagingException; -import javax.mail.internet.AddressException; import javax.mail.internet.MimeMessage; -import org.apache.commons.lang3.tuple.Pair; import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.Store; import org.apache.james.blob.mail.MimeMessagePartsId; -import org.apache.james.core.MailAddress; import org.apache.james.metrics.api.GaugeRegistry; -import org.apache.james.metrics.api.Metric; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.queue.api.MailQueue; -import org.apache.james.server.core.MailImpl; -import org.apache.james.util.SerializationUtil; import org.apache.mailet.Mail; -import org.apache.mailet.PerRecipientHeaders; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.guava.GuavaModule; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.github.fge.lambdas.Throwing; -import com.github.steveash.guavate.Guavate; import com.google.common.annotations.VisibleForTesting; -import com.nurkiewicz.asyncretry.AsyncRetryExecutor; -import com.rabbitmq.client.GetResponse; public class RabbitMQMailQueue implements MailQueue { private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQMailQueue.class); - private static class NoMailYetException extends RuntimeException { - } - - private static class RabbitMQMailQueueItem implements MailQueueItem { - private final RabbitClient rabbitClient; - private final long deliveryTag; - private final Mail mail; - - private RabbitMQMailQueueItem(RabbitClient rabbitClient, long deliveryTag, Mail mail) { - this.rabbitClient = rabbitClient; - this.deliveryTag = deliveryTag; - this.mail = mail; - } - - @Override - public Mail getMail() { - return mail; - } - - @Override - public void done(boolean success) throws MailQueueException { - try { - rabbitClient.ack(deliveryTag); - } catch (IOException e) { - throw new MailQueueException("Failed to ACK " + mail.getName() + " with delivery tag " + deliveryTag, e); - } - } - } - static class Factory { private final MetricFactory metricFactory; private final GaugeRegistry gaugeRegistry; private final RabbitClient rabbitClient; private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore; private final BlobId.Factory blobIdFactory; + private final ObjectMapper objectMapper; @Inject @VisibleForTesting Factory(MetricFactory metricFactory, GaugeRegistry gaugeRegistry, RabbitClient rabbitClient, @@ -107,40 +61,34 @@ public class RabbitMQMailQueue implements MailQueue { this.rabbitClient = rabbitClient; this.mimeMessageStore = mimeMessageStore; this.blobIdFactory = blobIdFactory; + this.objectMapper = new ObjectMapper() + .registerModule(new Jdk8Module()) + .registerModule(new JavaTimeModule()) + .registerModule(new GuavaModule()); } RabbitMQMailQueue create(MailQueueName mailQueueName) { - return new RabbitMQMailQueue(metricFactory, gaugeRegistry, mailQueueName, rabbitClient, mimeMessageStore, blobIdFactory); + return new RabbitMQMailQueue(metricFactory, gaugeRegistry, mailQueueName, + new Enqueuer(mailQueueName, rabbitClient, mimeMessageStore, objectMapper, metricFactory), + new Dequeuer(mailQueueName, rabbitClient, mimeMessageStore, blobIdFactory, objectMapper, metricFactory)); } } - private static final int TEN_MS = 10; - private final MailQueueName name; - private final RabbitClient rabbitClient; - private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore; - private final BlobId.Factory blobIdFactory; - private final ObjectMapper objectMapper; private final MetricFactory metricFactory; - private final Metric enqueueMetric; - private final Metric dequeueMetric; private final GaugeRegistry gaugeRegistry; + private final Enqueuer enqueuer; + private final Dequeuer dequeuer; + + RabbitMQMailQueue(MetricFactory metricFactory, GaugeRegistry gaugeRegistry, MailQueueName name, + Enqueuer enqueuer, Dequeuer dequeuer) { - RabbitMQMailQueue(MetricFactory metricFactory, GaugeRegistry gaugeRegistry, MailQueueName name, RabbitClient rabbitClient, - Store<MimeMessage, MimeMessagePartsId> mimeMessageStore, BlobId.Factory blobIdFactory) { - this.mimeMessageStore = mimeMessageStore; - this.blobIdFactory = blobIdFactory; this.name = name; - this.rabbitClient = rabbitClient; - this.objectMapper = new ObjectMapper() - .registerModule(new Jdk8Module()) - .registerModule(new JavaTimeModule()) - .registerModule(new GuavaModule()); + this.enqueuer = enqueuer; + this.dequeuer = dequeuer; this.metricFactory = metricFactory; this.gaugeRegistry = gaugeRegistry; - this.enqueueMetric = metricFactory.generate(ENQUEUED_METRIC_NAME_PREFIX + name.asString()); - this.dequeueMetric = metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + name.asString()); } @Override @@ -159,115 +107,12 @@ public class RabbitMQMailQueue implements MailQueue { @Override public void enQueue(Mail mail) throws MailQueueException { metricFactory.withMetric(ENQUEUED_TIMER_METRIC_NAME_PREFIX + name.asString(), - Throwing.runnable(() -> { - MimeMessagePartsId partsId = saveBlobs(mail).join(); - MailDTO mailDTO = MailDTO.fromMail(mail, partsId); - byte[] message = getMessageBytes(mailDTO); - rabbitClient.publish(name, message); - - enqueueMetric.increment(); - }).sneakyThrow()); - } - - private CompletableFuture<MimeMessagePartsId> saveBlobs(Mail mail) throws MailQueueException { - try { - return mimeMessageStore.save(mail.getMessage()); - } catch (MessagingException e) { - throw new MailQueueException("Error while saving blob", e); - } - } - - private byte[] getMessageBytes(MailDTO mailDTO) throws MailQueueException { - try { - return objectMapper.writeValueAsBytes(mailDTO); - } catch (JsonProcessingException e) { - throw new MailQueueException("Unable to serialize message", e); - } + Throwing.runnable(() -> enqueuer.enQueue(mail)).sneakyThrow()); } @Override public MailQueueItem deQueue() throws MailQueueException { return metricFactory.withMetric(DEQUEUED_TIMER_METRIC_NAME_PREFIX + name.asString(), - Throwing.supplier(() -> { - GetResponse getResponse = pollChannel(); - MailDTO mailDTO = toDTO(getResponse); - Mail mail = toMail(mailDTO); - dequeueMetric.increment(); - return new RabbitMQMailQueueItem(rabbitClient, getResponse.getEnvelope().getDeliveryTag(), mail); - }).sneakyThrow()); - } - - private MailDTO toDTO(GetResponse getResponse) throws MailQueueException { - try { - return objectMapper.readValue(getResponse.getBody(), MailDTO.class); - } catch (IOException e) { - throw new MailQueueException("Failed to parse DTO", e); - } - } - - private GetResponse pollChannel() { - return new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor()) - .withFixedRate() - .withMinDelay(TEN_MS) - .retryOn(NoMailYetException.class) - .getWithRetry(this::singleChannelRead) - .join(); - } - - private GetResponse singleChannelRead() throws IOException { - return rabbitClient.poll(name) - .filter(getResponse -> getResponse.getBody() != null) - .orElseThrow(NoMailYetException::new); - } - - private Mail toMail(MailDTO dto) throws MailQueueException { - try { - MimeMessage mimeMessage = mimeMessageStore.read( - MimeMessagePartsId.builder() - .headerBlobId(blobIdFactory.from(dto.getHeaderBlobId())) - .bodyBlobId(blobIdFactory.from(dto.getBodyBlobId())) - .build()) - .join(); - - MailImpl mail = new MailImpl( - dto.getName(), - dto.getSender().map(MailAddress::getMailSender).orElse(null), - dto.getRecipients() - .stream() - .map(Throwing.<String, MailAddress>function(MailAddress::new).sneakyThrow()) - .collect(Guavate.toImmutableList()), - mimeMessage); - - mail.setErrorMessage(dto.getErrorMessage()); - mail.setRemoteAddr(dto.getRemoteAddr()); - mail.setRemoteHost(dto.getRemoteHost()); - mail.setState(dto.getState()); - dto.getLastUpdated() - .map(Instant::toEpochMilli) - .map(Date::new) - .ifPresent(mail::setLastUpdated); - - dto.getAttributes() - .forEach((name, value) -> mail.setAttribute(name, SerializationUtil.<Serializable>deserialize(value))); - - mail.addAllSpecificHeaderForRecipient(retrievePerRecipientHeaders(dto)); - - return mail; - } catch (AddressException e) { - throw new MailQueueException("Failed to parse mail address", e); - } catch (MessagingException e) { - throw new MailQueueException("Failed to generate mime message", e); - } - } - - private PerRecipientHeaders retrievePerRecipientHeaders(MailDTO dto) { - PerRecipientHeaders perRecipientHeaders = new PerRecipientHeaders(); - dto.getPerRecipientHeaders() - .entrySet() - .stream() - .flatMap(entry -> entry.getValue().toHeaders().stream() - .map(Throwing.function(header -> Pair.of(new MailAddress(entry.getKey()), header)))) - .forEach(pair -> perRecipientHeaders.addHeaderForRecipient(pair.getValue(), pair.getKey())); - return perRecipientHeaders; + Throwing.supplier(dequeuer::deQueue).sneakyThrow()); } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org