JAMES-2541 Ack only when successful
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/46b1bfc2 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/46b1bfc2 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/46b1bfc2 Branch: refs/heads/master Commit: 46b1bfc2b85bc269ee5c0f0c7267645e30aaebb5 Parents: 89ea16a Author: Benoit Tellier <[email protected]> Authored: Wed Sep 12 16:25:51 2018 +0700 Committer: Antoine Duprat <[email protected]> Committed: Mon Sep 17 08:13:26 2018 +0200 ---------------------------------------------------------------------- .../org/apache/james/queue/api/MailQueueContract.java | 4 +--- .../java/org/apache/james/queue/rabbitmq/Dequeuer.java | 10 +++++++--- .../org/apache/james/queue/rabbitmq/RabbitClient.java | 6 ++++++ 3 files changed, 14 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/46b1bfc2/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java index ac68e87..e2df16f 100644 --- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java +++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java @@ -35,6 +35,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Stream; import javax.mail.internet.MimeMessage; @@ -295,9 +296,6 @@ public interface MailQueueContract { getMailQueue().enQueue(defaultMail() .name("name1") .build()); - getMailQueue().enQueue(defaultMail() - .name("name1") - .build()); MailQueue.MailQueueItem mailQueueItem1 = getMailQueue().deQueue(); mailQueueItem1.done(false); http://git-wip-us.apache.org/repos/asf/james-project/blob/46b1bfc2/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java index 03dafa4..7b81bb6 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java @@ -90,10 +90,14 @@ class Dequeuer { } private ThrowingConsumer<Boolean> ack(long deliveryTag, Mail mail) { - return sucess -> { + return success -> { try { - dequeueMetric.increment(); - rabbitClient.ack(deliveryTag); + if (success) { + dequeueMetric.increment(); + rabbitClient.ack(deliveryTag); + } else { + rabbitClient.nack(deliveryTag); + } } catch (IOException e) { throw new MailQueue.MailQueueException("Failed to ACK " + mail.getName() + " with delivery tag " + deliveryTag, e); } http://git-wip-us.apache.org/repos/asf/james-project/blob/46b1bfc2/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java index ae6b1be..8757e33 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java @@ -38,6 +38,7 @@ class RabbitClient { private static final boolean MULTIPLE = true; private static final ImmutableMap<String, Object> NO_ARGUMENTS = ImmutableMap.of(); private static final String ROUTING_KEY = ""; + public static final boolean REQUEUE = true; private final RabbitChannelPool channelPool; @@ -72,6 +73,11 @@ class RabbitClient { channelPool.execute(consumer); } + void nack(long deliveryTag) throws IOException { + RabbitChannelPool.RabbitConsumer<IOException> consumer = channel -> channel.basicNack(deliveryTag, !MULTIPLE, REQUEUE); + channelPool.execute(consumer); + } + Optional<GetResponse> poll(MailQueueName name) throws IOException { RabbitChannelPool.RabbitFunction<Optional<GetResponse>, IOException> f = channel -> Optional.ofNullable(channel.basicGet(name.toWorkQueueName().asString(), !AUTO_ACK)); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
