JAMES-2540 pass ack function to MailQueueItem

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/89ea16a9
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/89ea16a9
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/89ea16a9

Branch: refs/heads/master
Commit: 89ea16a95cb00bc96c8074961b75d170ccd6eec3
Parents: c8921e5
Author: Matthieu Baechler <[email protected]>
Authored: Tue Sep 11 16:54:24 2018 +0200
Committer: Antoine Duprat <[email protected]>
Committed: Mon Sep 17 08:13:25 2018 +0200

----------------------------------------------------------------------
 .../apache/james/queue/rabbitmq/Dequeuer.java   | 63 +++++++++++---------
 1 file changed, 36 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/89ea16a9/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index 89a40f4..03dafa4 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -22,22 +22,18 @@ package org.apache.james.queue.rabbitmq;
 import static org.apache.james.queue.api.MailQueue.DEQUEUED_METRIC_NAME_PREFIX;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
+import java.util.function.Consumer;
 import java.util.function.Function;
 
-import javax.mail.MessagingException;
-import javax.mail.internet.AddressException;
-import javax.mail.internet.MimeMessage;
-
-import org.apache.james.blob.api.BlobId;
-import org.apache.james.blob.api.Store;
-import org.apache.james.blob.mail.MimeMessagePartsId;
 import org.apache.james.metrics.api.Metric;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.mailet.Mail;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.fge.lambdas.Throwing;
+import com.github.fge.lambdas.consumers.ThrowingConsumer;
 import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
 import com.rabbitmq.client.GetResponse;
 
@@ -46,13 +42,11 @@ class Dequeuer {
     }
 
     private static class RabbitMQMailQueueItem implements 
MailQueue.MailQueueItem {
-        private final RabbitClient rabbitClient;
-        private final long deliveryTag;
+        private final Consumer<Boolean> ack;
         private final Mail mail;
 
-        private RabbitMQMailQueueItem(RabbitClient rabbitClient, long 
deliveryTag, Mail mail) {
-            this.rabbitClient = rabbitClient;
-            this.deliveryTag = deliveryTag;
+        private RabbitMQMailQueueItem(Consumer<Boolean> ack, Mail mail) {
+            this.ack = ack;
             this.mail = mail;
         }
 
@@ -63,11 +57,7 @@ class Dequeuer {
 
         @Override
         public void done(boolean success) throws MailQueue.MailQueueException {
-            try {
-                rabbitClient.ack(deliveryTag);
-            } catch (IOException e) {
-                throw new MailQueue.MailQueueException("Failed to ACK " + 
mail.getName() + " with delivery tag " + deliveryTag, e);
-            }
+            ack.accept(success);
         }
     }
 
@@ -88,14 +78,34 @@ class Dequeuer {
     }
 
     MailQueue.MailQueueItem deQueue() throws MailQueue.MailQueueException {
-        GetResponse getResponse = pollChannel();
-        MailReferenceDTO mailDTO = toDTO(getResponse);
-        Mail mail = mailLoader.apply(mailDTO);
-        dequeueMetric.increment();
-        return new RabbitMQMailQueueItem(rabbitClient, 
getResponse.getEnvelope().getDeliveryTag(), mail);
+        return pollChannel()
+            .thenApply(Throwing.function(this::loadItem).sneakyThrow())
+            .join();
     }
 
-    private MailReferenceDTO toDTO(GetResponse getResponse) throws 
MailQueue.MailQueueException {
+    private RabbitMQMailQueueItem loadItem(GetResponse response) throws 
MailQueue.MailQueueException {
+        Mail mail = loadMail(response);
+        ThrowingConsumer<Boolean> ack = 
ack(response.getEnvelope().getDeliveryTag(), mail);
+        return new RabbitMQMailQueueItem(ack, mail);
+    }
+
+    private ThrowingConsumer<Boolean> ack(long deliveryTag, Mail mail) {
+        return sucess -> {
+            try {
+                dequeueMetric.increment();
+                rabbitClient.ack(deliveryTag);
+            } catch (IOException e) {
+                throw new MailQueue.MailQueueException("Failed to ACK " + 
mail.getName() + " with delivery tag " + deliveryTag, e);
+            }
+        };
+    }
+
+    private Mail loadMail(GetResponse response) throws 
MailQueue.MailQueueException {
+        MailReferenceDTO mailDTO = toMailReference(response);
+        return mailLoader.apply(mailDTO);
+    }
+
+    private MailReferenceDTO toMailReference(GetResponse getResponse) throws 
MailQueue.MailQueueException {
         try {
             return mailReferenceSerializer.read(getResponse.getBody());
         } catch (IOException e) {
@@ -103,13 +113,12 @@ class Dequeuer {
         }
     }
 
-    private GetResponse pollChannel() {
+    private CompletableFuture<GetResponse> pollChannel() {
         return new 
AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor())
             .withFixedRate()
             .withMinDelay(TEN_MS)
             .retryOn(NoMailYetException.class)
-            .getWithRetry(this::singleChannelRead)
-            .join();
+            .getWithRetry(this::singleChannelRead);
     }
 
     private GetResponse singleChannelRead() throws IOException {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to