JAMES-2544 implement RabbitMQ dequeue with reactor-rabbitmq

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/97fa9683
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/97fa9683
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/97fa9683

Branch: refs/heads/master
Commit: 97fa9683a84d1c29eea7d0968eb5efa086696d52
Parents: df69a82
Author: Matthieu Baechler <matth...@apache.org>
Authored: Wed Dec 5 12:39:48 2018 +0100
Committer: Matthieu Baechler <matth...@apache.org>
Committed: Wed Feb 6 10:07:09 2019 +0100

----------------------------------------------------------------------
 backends-common/rabbitmq/pom.xml                |  4 +
 .../backend/rabbitmq/RabbitMQChannelPool.java   |  4 +
 .../backend/rabbitmq/SimpleChannelPool.java     | 21 +++++-
 mailbox/event/event-rabbitmq/pom.xml            |  1 -
 .../apache/james/mpt/smtp/ForwardSmtpTest.java  |  3 +-
 pom.xml                                         |  5 ++
 .../james/CassandraRabbitMQJamesServerTest.java |  6 +-
 .../apache/james/queue/rabbitmq/Dequeuer.java   | 78 ++++++++------------
 .../james/queue/rabbitmq/RabbitClient.java      | 23 +-----
 9 files changed, 70 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/97fa9683/backends-common/rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/backends-common/rabbitmq/pom.xml b/backends-common/rabbitmq/pom.xml
index 994fea7..51ecf1a 100644
--- a/backends-common/rabbitmq/pom.xml
+++ b/backends-common/rabbitmq/pom.xml
@@ -81,6 +81,10 @@
             <version>${feign.version}</version>
         </dependency>
         <dependency>
+            <groupId>io.projectreactor.rabbitmq</groupId>
+            <artifactId>reactor-rabbitmq</artifactId>
+        </dependency>
+        <dependency>
             <groupId>javax.inject</groupId>
             <artifactId>javax.inject</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/james-project/blob/97fa9683/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java
----------------------------------------------------------------------
diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java
index 6547c58..f26efbd 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java
@@ -20,6 +20,8 @@
 package org.apache.james.backend.rabbitmq;
 
 import com.rabbitmq.client.Channel;
+import reactor.core.publisher.Flux;
+import reactor.rabbitmq.AcknowledgableDelivery;
 
 public interface RabbitMQChannelPool {
     class ConnectionFailedException extends RuntimeException {
@@ -45,5 +47,7 @@ public interface RabbitMQChannelPool {
     <E extends Throwable> void execute(RabbitConsumer<E> f)
         throws E, ConnectionFailedException;
 
+    Flux<AcknowledgableDelivery> receive(String queueName);
+
     void close() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/97fa9683/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java
----------------------------------------------------------------------
diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java
index e708118..0de3603 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java
@@ -26,28 +26,43 @@ import javax.annotation.PreDestroy;
 import javax.inject.Inject;
 
 import com.github.fge.lambdas.Throwing;
+import com.google.common.annotations.VisibleForTesting;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
+import reactor.core.publisher.Flux;
+import reactor.rabbitmq.AcknowledgableDelivery;
+import reactor.rabbitmq.RabbitFlux;
+import reactor.rabbitmq.Receiver;
+import reactor.rabbitmq.ReceiverOptions;
 
 public class SimpleChannelPool implements RabbitMQChannelPool {
     private final AtomicReference<Channel> channelReference;
     private final AtomicReference<Connection> connectionReference;
     private final RabbitMQConnectionFactory connectionFactory;
+    private final Receiver rabbitFlux;
 
     @Inject
-    public SimpleChannelPool(RabbitMQConnectionFactory factory) {
+    @VisibleForTesting
+    SimpleChannelPool(RabbitMQConnectionFactory factory) {
         this.connectionFactory = factory;
         this.connectionReference = new AtomicReference<>();
         this.channelReference = new AtomicReference<>();
+        this.rabbitFlux = RabbitFlux
+            .createReceiver(new 
ReceiverOptions().connectionMono(connectionFactory.connectionMono()));
     }
 
     @Override
-    public synchronized  <T, E extends Throwable> T execute(RabbitFunction<T, 
E> f) throws E, ConnectionFailedException {
+    public Flux<AcknowledgableDelivery> receive(String queueName) {
+        return rabbitFlux.consumeManualAck(queueName);
+    }
+
+    @Override
+    public synchronized <T, E extends Throwable> T execute(RabbitFunction<T, 
E> f) throws E, ConnectionFailedException {
         return f.execute(getResilientChannel());
     }
 
     @Override
-    public synchronized  <E extends Throwable> void execute(RabbitConsumer<E> 
f) throws E, ConnectionFailedException {
+    public synchronized <E extends Throwable> void execute(RabbitConsumer<E> 
f) throws E, ConnectionFailedException {
         f.execute(getResilientChannel());
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/97fa9683/mailbox/event/event-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/pom.xml 
b/mailbox/event/event-rabbitmq/pom.xml
index 157be73..22026f1 100644
--- a/mailbox/event/event-rabbitmq/pom.xml
+++ b/mailbox/event/event-rabbitmq/pom.xml
@@ -82,7 +82,6 @@
         <dependency>
             <groupId>io.projectreactor.rabbitmq</groupId>
             <artifactId>reactor-rabbitmq</artifactId>
-            <version>1.0.0.RELEASE</version>
         </dependency>
         <dependency>
             <groupId>nl.jqno.equalsverifier</groupId>

http://git-wip-us.apache.org/repos/asf/james-project/blob/97fa9683/mpt/impl/smtp/core/src/main/java/org/apache/james/mpt/smtp/ForwardSmtpTest.java
----------------------------------------------------------------------
diff --git 
a/mpt/impl/smtp/core/src/main/java/org/apache/james/mpt/smtp/ForwardSmtpTest.java
 
b/mpt/impl/smtp/core/src/main/java/org/apache/james/mpt/smtp/ForwardSmtpTest.java
index 19f934c..ffc3fde 100644
--- 
a/mpt/impl/smtp/core/src/main/java/org/apache/james/mpt/smtp/ForwardSmtpTest.java
+++ 
b/mpt/impl/smtp/core/src/main/java/org/apache/james/mpt/smtp/ForwardSmtpTest.java
@@ -20,6 +20,7 @@ package org.apache.james.mpt.smtp;
 
 import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS;
 import static org.awaitility.Duration.ONE_MINUTE;
+import static org.awaitility.Duration.TWO_MINUTES;
 import static org.hamcrest.Matchers.equalTo;
 
 import java.util.Locale;
@@ -75,7 +76,7 @@ public abstract class ForwardSmtpTest {
     public void forwardingAnEmailShouldWork() throws Exception {
         scriptedTest.run("helo");
 
-        calmlyAwait.atMost(ONE_MINUTE).untilAsserted(() ->
+        calmlyAwait.atMost(TWO_MINUTES).untilAsserted(() ->
             fakeSmtp.assertEmailReceived(response -> response
                 .body("[0].from", equalTo("matth...@yopmail.com"))
                 .body("[0].subject", equalTo("test"))

http://git-wip-us.apache.org/repos/asf/james-project/blob/97fa9683/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 58ec929..76bdb35 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2025,6 +2025,11 @@
                 <version>${netty.version}</version>
             </dependency>
             <dependency>
+                <groupId>io.projectreactor.rabbitmq</groupId>
+                <artifactId>reactor-rabbitmq</artifactId>
+                <version>1.0.0.RELEASE</version>
+            </dependency>
+            <dependency>
                 <groupId>javax.activation</groupId>
                 <artifactId>activation</artifactId>
                 <version>${javax-activation.version}</version>

http://git-wip-us.apache.org/repos/asf/james-project/blob/97fa9683/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/CassandraRabbitMQJamesServerTest.java
----------------------------------------------------------------------
diff --git 
a/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/CassandraRabbitMQJamesServerTest.java
 
b/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/CassandraRabbitMQJamesServerTest.java
index 2ab287d..46bdb23 100644
--- 
a/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/CassandraRabbitMQJamesServerTest.java
+++ 
b/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/CassandraRabbitMQJamesServerTest.java
@@ -92,7 +92,7 @@ class CassandraRabbitMQJamesServerTest {
             
.overrideWith(JmapJamesServerContract.DOMAIN_LIST_CONFIGURATION_MODULE);
 
     @Nested
-    @TestInstance(Lifecycle.PER_CLASS)
+    @TestInstance(Lifecycle.PER_METHOD)
     class WithEncryptedSwift implements ContractSuite {
         @RegisterExtension
         JamesServerExtension testExtension = baseExtensionBuilder()
@@ -111,7 +111,7 @@ class CassandraRabbitMQJamesServerTest {
     }
 
     @Nested
-    @TestInstance(Lifecycle.PER_CLASS)
+    @TestInstance(Lifecycle.PER_METHOD)
     class WithDefaultSwift implements ContractSuite {
         @RegisterExtension
         JamesServerExtension testExtension = baseExtensionBuilder()
@@ -129,7 +129,7 @@ class CassandraRabbitMQJamesServerTest {
     }
 
     @Nested
-    @TestInstance(Lifecycle.PER_CLASS)
+    @TestInstance(Lifecycle.PER_METHOD)
     class WithoutSwift implements ContractSuite {
         @RegisterExtension
         JamesServerExtension testExtension = baseExtensionBuilder().build();

http://git-wip-us.apache.org/repos/asf/james-project/blob/97fa9683/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index 76e9838..104fed9 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -22,9 +22,7 @@ package org.apache.james.queue.rabbitmq;
 import static org.apache.james.queue.api.MailQueue.DEQUEUED_METRIC_NAME_PREFIX;
 
 import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
@@ -33,17 +31,17 @@ import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.rabbitmq.view.api.DeleteCondition;
 import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
-import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.apache.mailet.Mail;
 
 import com.github.fge.lambdas.Throwing;
 import com.github.fge.lambdas.consumers.ThrowingConsumer;
-import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
-import com.rabbitmq.client.GetResponse;
+import com.rabbitmq.client.Delivery;
+import reactor.rabbitmq.AcknowledgableDelivery;
 
 class Dequeuer {
-    private static class NoMailYetException extends RuntimeException {
-    }
+
+    private static final boolean REQUEUE = true;
+    private final LinkedBlockingQueue<AcknowledgableDelivery> messages;
 
     private static class RabbitMQMailQueueItem implements 
MailQueue.MailQueueItem {
         private final Consumer<Boolean> ack;
@@ -65,10 +63,6 @@ class Dequeuer {
         }
     }
 
-    private static final int TEN_MS = 10;
-
-    private final MailQueueName name;
-    private final RabbitClient rabbitClient;
     private final Function<MailReferenceDTO, Mail> mailLoader;
     private final Metric dequeueMetric;
     private final MailReferenceSerializer mailReferenceSerializer;
@@ -77,48 +71,51 @@ class Dequeuer {
     Dequeuer(MailQueueName name, RabbitClient rabbitClient, 
Function<MailReferenceDTO, Mail> mailLoader,
              MailReferenceSerializer serializer, MetricFactory metricFactory,
              MailQueueView mailQueueView) {
-        this.name = name;
-        this.rabbitClient = rabbitClient;
         this.mailLoader = mailLoader;
         this.mailReferenceSerializer = serializer;
         this.mailQueueView = mailQueueView;
         this.dequeueMetric = 
metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + name.asString());
+        this.messages = messageIterator(name, rabbitClient);
+    }
+
+    private LinkedBlockingQueue<AcknowledgableDelivery> 
messageIterator(MailQueueName name, RabbitClient rabbitClient) {
+        LinkedBlockingQueue<AcknowledgableDelivery> dequeue = new 
LinkedBlockingQueue<>(1);
+        rabbitClient
+            .receive(name)
+            .filter(getResponse -> getResponse.getBody() != null)
+            .doOnNext(Throwing.consumer(dequeue::put))
+            .subscribe();
+        return dequeue;
     }
 
-    MailQueue.MailQueueItem deQueue() {
-        return pollChannel()
-            .thenApply(Throwing.function(this::loadItem).sneakyThrow())
-            .join();
+    MailQueue.MailQueueItem deQueue() throws MailQueue.MailQueueException, 
InterruptedException {
+        return loadItem(messages.take());
     }
 
-    private RabbitMQMailQueueItem loadItem(GetResponse response) throws 
MailQueue.MailQueueException {
+    private RabbitMQMailQueueItem loadItem(AcknowledgableDelivery response) 
throws MailQueue.MailQueueException {
         Mail mail = loadMail(response);
-        ThrowingConsumer<Boolean> ack = 
ack(response.getEnvelope().getDeliveryTag(), mail);
+        ThrowingConsumer<Boolean> ack = ack(response, 
response.getEnvelope().getDeliveryTag(), mail);
         return new RabbitMQMailQueueItem(ack, mail);
     }
 
-    private ThrowingConsumer<Boolean> ack(long deliveryTag, Mail mail) {
+    private ThrowingConsumer<Boolean> ack(AcknowledgableDelivery response, 
long deliveryTag, Mail mail) {
         return success -> {
-            try {
-                if (success) {
-                    dequeueMetric.increment();
-                    rabbitClient.ack(deliveryTag);
-                    
mailQueueView.delete(DeleteCondition.withName(mail.getName()));
-                } else {
-                    rabbitClient.nack(deliveryTag);
-                }
-            } catch (IOException e) {
-                throw new MailQueue.MailQueueException("Failed to ACK " + 
mail.getName() + " with delivery tag " + deliveryTag, e);
+            if (success) {
+                dequeueMetric.increment();
+                response.ack();
+                mailQueueView.delete(DeleteCondition.withName(mail.getName()));
+            } else {
+                response.nack(REQUEUE);
             }
         };
     }
 
-    private Mail loadMail(GetResponse response) throws 
MailQueue.MailQueueException {
+    private Mail loadMail(Delivery response) throws 
MailQueue.MailQueueException {
         MailReferenceDTO mailDTO = toMailReference(response);
         return mailLoader.apply(mailDTO);
     }
 
-    private MailReferenceDTO toMailReference(GetResponse getResponse) throws 
MailQueue.MailQueueException {
+    private MailReferenceDTO toMailReference(Delivery getResponse) throws 
MailQueue.MailQueueException {
         try {
             return mailReferenceSerializer.read(getResponse.getBody());
         } catch (IOException e) {
@@ -126,19 +123,4 @@ class Dequeuer {
         }
     }
 
-    private CompletableFuture<GetResponse> pollChannel() {
-        ThreadFactory threadFactory = 
NamedThreadFactory.withClassName(getClass());
-        return new 
AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor(threadFactory))
-            .withFixedRate()
-            .withMinDelay(TEN_MS)
-            .retryOn(NoMailYetException.class)
-            .getWithRetry(this::singleChannelRead);
-    }
-
-    private GetResponse singleChannelRead() throws IOException {
-        return rabbitClient.poll(name)
-            .filter(getResponse -> getResponse.getBody() != null)
-            .orElseThrow(NoMailYetException::new);
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/97fa9683/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java
index d6d0618..4a19a8d 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java
@@ -19,17 +19,13 @@
 
 package org.apache.james.queue.rabbitmq;
 
-import static org.apache.james.backend.rabbitmq.Constants.AUTO_ACK;
 import static org.apache.james.backend.rabbitmq.Constants.AUTO_DELETE;
 import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
 import static org.apache.james.backend.rabbitmq.Constants.EMPTY_ROUTING_KEY;
 import static org.apache.james.backend.rabbitmq.Constants.EXCLUSIVE;
-import static org.apache.james.backend.rabbitmq.Constants.MULTIPLE;
 import static org.apache.james.backend.rabbitmq.Constants.NO_ARGUMENTS;
-import static org.apache.james.backend.rabbitmq.Constants.REQUEUE;
 
 import java.io.IOException;
-import java.util.Optional;
 
 import javax.inject.Inject;
 
@@ -37,7 +33,8 @@ import org.apache.james.backend.rabbitmq.RabbitMQChannelPool;
 import org.apache.james.queue.api.MailQueue;
 
 import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.GetResponse;
+import reactor.core.publisher.Flux;
+import reactor.rabbitmq.AcknowledgableDelivery;
 
 class RabbitClient {
     private final RabbitMQChannelPool channelPool;
@@ -69,19 +66,7 @@ class RabbitClient {
         });
     }
 
-    void ack(long deliveryTag) throws IOException {
-        RabbitMQChannelPool.RabbitConsumer<IOException> consumer = channel -> 
channel.basicAck(deliveryTag, !MULTIPLE);
-        channelPool.execute(consumer);
-    }
-
-    void nack(long deliveryTag) throws IOException {
-        RabbitMQChannelPool.RabbitConsumer<IOException> consumer = channel -> 
channel.basicNack(deliveryTag, !MULTIPLE, REQUEUE);
-        channelPool.execute(consumer);
-    }
-
-    Optional<GetResponse> poll(MailQueueName name) throws IOException {
-        RabbitMQChannelPool.RabbitFunction<Optional<GetResponse>, IOException> 
f = channel ->
-            
Optional.ofNullable(channel.basicGet(name.toWorkQueueName().asString(), 
!AUTO_ACK));
-        return channelPool.execute(f);
+    Flux<AcknowledgableDelivery> receive(MailQueueName name) {
+        return channelPool.receive(name.toWorkQueueName().asString());
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to