JAMES-2540 pass ack function to MailQueueItem
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/89ea16a9 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/89ea16a9 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/89ea16a9 Branch: refs/heads/master Commit: 89ea16a95cb00bc96c8074961b75d170ccd6eec3 Parents: c8921e5 Author: Matthieu Baechler <[email protected]> Authored: Tue Sep 11 16:54:24 2018 +0200 Committer: Antoine Duprat <[email protected]> Committed: Mon Sep 17 08:13:25 2018 +0200 ---------------------------------------------------------------------- .../apache/james/queue/rabbitmq/Dequeuer.java | 63 +++++++++++--------- 1 file changed, 36 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/89ea16a9/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java index 89a40f4..03dafa4 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java @@ -22,22 +22,18 @@ package org.apache.james.queue.rabbitmq; import static org.apache.james.queue.api.MailQueue.DEQUEUED_METRIC_NAME_PREFIX; import java.io.IOException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; +import java.util.function.Consumer; import java.util.function.Function; -import javax.mail.MessagingException; -import javax.mail.internet.AddressException; -import javax.mail.internet.MimeMessage; - -import org.apache.james.blob.api.BlobId; -import org.apache.james.blob.api.Store; -import org.apache.james.blob.mail.MimeMessagePartsId; import org.apache.james.metrics.api.Metric; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.queue.api.MailQueue; import org.apache.mailet.Mail; -import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.fge.lambdas.Throwing; +import com.github.fge.lambdas.consumers.ThrowingConsumer; import com.nurkiewicz.asyncretry.AsyncRetryExecutor; import com.rabbitmq.client.GetResponse; @@ -46,13 +42,11 @@ class Dequeuer { } private static class RabbitMQMailQueueItem implements MailQueue.MailQueueItem { - private final RabbitClient rabbitClient; - private final long deliveryTag; + private final Consumer<Boolean> ack; private final Mail mail; - private RabbitMQMailQueueItem(RabbitClient rabbitClient, long deliveryTag, Mail mail) { - this.rabbitClient = rabbitClient; - this.deliveryTag = deliveryTag; + private RabbitMQMailQueueItem(Consumer<Boolean> ack, Mail mail) { + this.ack = ack; this.mail = mail; } @@ -63,11 +57,7 @@ class Dequeuer { @Override public void done(boolean success) throws MailQueue.MailQueueException { - try { - rabbitClient.ack(deliveryTag); - } catch (IOException e) { - throw new MailQueue.MailQueueException("Failed to ACK " + mail.getName() + " with delivery tag " + deliveryTag, e); - } + ack.accept(success); } } @@ -88,14 +78,34 @@ class Dequeuer { } MailQueue.MailQueueItem deQueue() throws MailQueue.MailQueueException { - GetResponse getResponse = pollChannel(); - MailReferenceDTO mailDTO = toDTO(getResponse); - Mail mail = mailLoader.apply(mailDTO); - dequeueMetric.increment(); - return new RabbitMQMailQueueItem(rabbitClient, getResponse.getEnvelope().getDeliveryTag(), mail); + return pollChannel() + .thenApply(Throwing.function(this::loadItem).sneakyThrow()) + .join(); } - private MailReferenceDTO toDTO(GetResponse getResponse) throws MailQueue.MailQueueException { + private RabbitMQMailQueueItem loadItem(GetResponse response) throws MailQueue.MailQueueException { + Mail mail = loadMail(response); + ThrowingConsumer<Boolean> ack = ack(response.getEnvelope().getDeliveryTag(), mail); + return new RabbitMQMailQueueItem(ack, mail); + } + + private ThrowingConsumer<Boolean> ack(long deliveryTag, Mail mail) { + return sucess -> { + try { + dequeueMetric.increment(); + rabbitClient.ack(deliveryTag); + } catch (IOException e) { + throw new MailQueue.MailQueueException("Failed to ACK " + mail.getName() + " with delivery tag " + deliveryTag, e); + } + }; + } + + private Mail loadMail(GetResponse response) throws MailQueue.MailQueueException { + MailReferenceDTO mailDTO = toMailReference(response); + return mailLoader.apply(mailDTO); + } + + private MailReferenceDTO toMailReference(GetResponse getResponse) throws MailQueue.MailQueueException { try { return mailReferenceSerializer.read(getResponse.getBody()); } catch (IOException e) { @@ -103,13 +113,12 @@ class Dequeuer { } } - private GetResponse pollChannel() { + private CompletableFuture<GetResponse> pollChannel() { return new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor()) .withFixedRate() .withMinDelay(TEN_MS) .retryOn(NoMailYetException.class) - .getWithRetry(this::singleChannelRead) - .join(); + .getWithRetry(this::singleChannelRead); } private GetResponse singleChannelRead() throws IOException { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
