JAMES-2541 Ack only when successful

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/46b1bfc2
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/46b1bfc2
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/46b1bfc2

Branch: refs/heads/master
Commit: 46b1bfc2b85bc269ee5c0f0c7267645e30aaebb5
Parents: 89ea16a
Author: Benoit Tellier <[email protected]>
Authored: Wed Sep 12 16:25:51 2018 +0700
Committer: Antoine Duprat <[email protected]>
Committed: Mon Sep 17 08:13:26 2018 +0200

----------------------------------------------------------------------
 .../org/apache/james/queue/api/MailQueueContract.java     |  4 +---
 .../java/org/apache/james/queue/rabbitmq/Dequeuer.java    | 10 +++++++---
 .../org/apache/james/queue/rabbitmq/RabbitClient.java     |  6 ++++++
 3 files changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/46b1bfc2/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
 
b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
index ac68e87..e2df16f 100644
--- 
a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
+++ 
b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Stream;
 
 import javax.mail.internet.MimeMessage;
 
@@ -295,9 +296,6 @@ public interface MailQueueContract {
         getMailQueue().enQueue(defaultMail()
             .name("name1")
             .build());
-        getMailQueue().enQueue(defaultMail()
-            .name("name1")
-            .build());
 
         MailQueue.MailQueueItem mailQueueItem1 = getMailQueue().deQueue();
         mailQueueItem1.done(false);

http://git-wip-us.apache.org/repos/asf/james-project/blob/46b1bfc2/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index 03dafa4..7b81bb6 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -90,10 +90,14 @@ class Dequeuer {
     }
 
     private ThrowingConsumer<Boolean> ack(long deliveryTag, Mail mail) {
-        return sucess -> {
+        return success -> {
             try {
-                dequeueMetric.increment();
-                rabbitClient.ack(deliveryTag);
+                if (success) {
+                    dequeueMetric.increment();
+                    rabbitClient.ack(deliveryTag);
+                } else {
+                    rabbitClient.nack(deliveryTag);
+                }
             } catch (IOException e) {
                 throw new MailQueue.MailQueueException("Failed to ACK " + 
mail.getName() + " with delivery tag " + deliveryTag, e);
             }

http://git-wip-us.apache.org/repos/asf/james-project/blob/46b1bfc2/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java
index ae6b1be..8757e33 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java
@@ -38,6 +38,7 @@ class RabbitClient {
     private static final boolean MULTIPLE = true;
     private static final ImmutableMap<String, Object> NO_ARGUMENTS = 
ImmutableMap.of();
     private static final String ROUTING_KEY = "";
+    public static final boolean REQUEUE = true;
 
     private final RabbitChannelPool channelPool;
 
@@ -72,6 +73,11 @@ class RabbitClient {
         channelPool.execute(consumer);
     }
 
+    void nack(long deliveryTag) throws IOException {
+        RabbitChannelPool.RabbitConsumer<IOException> consumer = channel -> 
channel.basicNack(deliveryTag, !MULTIPLE, REQUEUE);
+        channelPool.execute(consumer);
+    }
+
     Optional<GetResponse> poll(MailQueueName name) throws IOException {
         RabbitChannelPool.RabbitFunction<Optional<GetResponse>, IOException> f 
= channel ->
             
Optional.ofNullable(channel.basicGet(name.toWorkQueueName().asString(), 
!AUTO_ACK));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to