JAMES-2544 implement RabbitMQ dequeue with reactor-rabbitmq
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/97fa9683 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/97fa9683 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/97fa9683 Branch: refs/heads/master Commit: 97fa9683a84d1c29eea7d0968eb5efa086696d52 Parents: df69a82 Author: Matthieu Baechler <matth...@apache.org> Authored: Wed Dec 5 12:39:48 2018 +0100 Committer: Matthieu Baechler <matth...@apache.org> Committed: Wed Feb 6 10:07:09 2019 +0100 ---------------------------------------------------------------------- backends-common/rabbitmq/pom.xml | 4 + .../backend/rabbitmq/RabbitMQChannelPool.java | 4 + .../backend/rabbitmq/SimpleChannelPool.java | 21 +++++- mailbox/event/event-rabbitmq/pom.xml | 1 - .../apache/james/mpt/smtp/ForwardSmtpTest.java | 3 +- pom.xml | 5 ++ .../james/CassandraRabbitMQJamesServerTest.java | 6 +- .../apache/james/queue/rabbitmq/Dequeuer.java | 78 ++++++++------------ .../james/queue/rabbitmq/RabbitClient.java | 23 +----- 9 files changed, 70 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/97fa9683/backends-common/rabbitmq/pom.xml ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/pom.xml b/backends-common/rabbitmq/pom.xml index 994fea7..51ecf1a 100644 --- a/backends-common/rabbitmq/pom.xml +++ b/backends-common/rabbitmq/pom.xml @@ -81,6 +81,10 @@ <version>${feign.version}</version> </dependency> <dependency> + <groupId>io.projectreactor.rabbitmq</groupId> + <artifactId>reactor-rabbitmq</artifactId> + </dependency> + <dependency> <groupId>javax.inject</groupId> <artifactId>javax.inject</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/james-project/blob/97fa9683/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java index 6547c58..f26efbd 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java @@ -20,6 +20,8 @@ package org.apache.james.backend.rabbitmq; import com.rabbitmq.client.Channel; +import reactor.core.publisher.Flux; +import reactor.rabbitmq.AcknowledgableDelivery; public interface RabbitMQChannelPool { class ConnectionFailedException extends RuntimeException { @@ -45,5 +47,7 @@ public interface RabbitMQChannelPool { <E extends Throwable> void execute(RabbitConsumer<E> f) throws E, ConnectionFailedException; + Flux<AcknowledgableDelivery> receive(String queueName); + void close() throws Exception; } http://git-wip-us.apache.org/repos/asf/james-project/blob/97fa9683/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java index e708118..0de3603 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java @@ -26,28 +26,43 @@ import javax.annotation.PreDestroy; import javax.inject.Inject; import com.github.fge.lambdas.Throwing; +import com.google.common.annotations.VisibleForTesting; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; +import reactor.core.publisher.Flux; +import reactor.rabbitmq.AcknowledgableDelivery; +import reactor.rabbitmq.RabbitFlux; +import reactor.rabbitmq.Receiver; +import reactor.rabbitmq.ReceiverOptions; public class SimpleChannelPool implements RabbitMQChannelPool { private final AtomicReference<Channel> channelReference; private final AtomicReference<Connection> connectionReference; private final RabbitMQConnectionFactory connectionFactory; + private final Receiver rabbitFlux; @Inject - public SimpleChannelPool(RabbitMQConnectionFactory factory) { + @VisibleForTesting + SimpleChannelPool(RabbitMQConnectionFactory factory) { this.connectionFactory = factory; this.connectionReference = new AtomicReference<>(); this.channelReference = new AtomicReference<>(); + this.rabbitFlux = RabbitFlux + .createReceiver(new ReceiverOptions().connectionMono(connectionFactory.connectionMono())); } @Override - public synchronized <T, E extends Throwable> T execute(RabbitFunction<T, E> f) throws E, ConnectionFailedException { + public Flux<AcknowledgableDelivery> receive(String queueName) { + return rabbitFlux.consumeManualAck(queueName); + } + + @Override + public synchronized <T, E extends Throwable> T execute(RabbitFunction<T, E> f) throws E, ConnectionFailedException { return f.execute(getResilientChannel()); } @Override - public synchronized <E extends Throwable> void execute(RabbitConsumer<E> f) throws E, ConnectionFailedException { + public synchronized <E extends Throwable> void execute(RabbitConsumer<E> f) throws E, ConnectionFailedException { f.execute(getResilientChannel()); } http://git-wip-us.apache.org/repos/asf/james-project/blob/97fa9683/mailbox/event/event-rabbitmq/pom.xml ---------------------------------------------------------------------- diff --git a/mailbox/event/event-rabbitmq/pom.xml b/mailbox/event/event-rabbitmq/pom.xml index 157be73..22026f1 100644 --- a/mailbox/event/event-rabbitmq/pom.xml +++ b/mailbox/event/event-rabbitmq/pom.xml @@ -82,7 +82,6 @@ <dependency> <groupId>io.projectreactor.rabbitmq</groupId> <artifactId>reactor-rabbitmq</artifactId> - <version>1.0.0.RELEASE</version> </dependency> <dependency> <groupId>nl.jqno.equalsverifier</groupId> http://git-wip-us.apache.org/repos/asf/james-project/blob/97fa9683/mpt/impl/smtp/core/src/main/java/org/apache/james/mpt/smtp/ForwardSmtpTest.java ---------------------------------------------------------------------- diff --git a/mpt/impl/smtp/core/src/main/java/org/apache/james/mpt/smtp/ForwardSmtpTest.java b/mpt/impl/smtp/core/src/main/java/org/apache/james/mpt/smtp/ForwardSmtpTest.java index 19f934c..ffc3fde 100644 --- a/mpt/impl/smtp/core/src/main/java/org/apache/james/mpt/smtp/ForwardSmtpTest.java +++ b/mpt/impl/smtp/core/src/main/java/org/apache/james/mpt/smtp/ForwardSmtpTest.java @@ -20,6 +20,7 @@ package org.apache.james.mpt.smtp; import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS; import static org.awaitility.Duration.ONE_MINUTE; +import static org.awaitility.Duration.TWO_MINUTES; import static org.hamcrest.Matchers.equalTo; import java.util.Locale; @@ -75,7 +76,7 @@ public abstract class ForwardSmtpTest { public void forwardingAnEmailShouldWork() throws Exception { scriptedTest.run("helo"); - calmlyAwait.atMost(ONE_MINUTE).untilAsserted(() -> + calmlyAwait.atMost(TWO_MINUTES).untilAsserted(() -> fakeSmtp.assertEmailReceived(response -> response .body("[0].from", equalTo("matth...@yopmail.com")) .body("[0].subject", equalTo("test")) http://git-wip-us.apache.org/repos/asf/james-project/blob/97fa9683/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 58ec929..76bdb35 100644 --- a/pom.xml +++ b/pom.xml @@ -2025,6 +2025,11 @@ <version>${netty.version}</version> </dependency> <dependency> + <groupId>io.projectreactor.rabbitmq</groupId> + <artifactId>reactor-rabbitmq</artifactId> + <version>1.0.0.RELEASE</version> + </dependency> + <dependency> <groupId>javax.activation</groupId> <artifactId>activation</artifactId> <version>${javax-activation.version}</version> http://git-wip-us.apache.org/repos/asf/james-project/blob/97fa9683/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/CassandraRabbitMQJamesServerTest.java ---------------------------------------------------------------------- diff --git a/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/CassandraRabbitMQJamesServerTest.java b/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/CassandraRabbitMQJamesServerTest.java index 2ab287d..46bdb23 100644 --- a/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/CassandraRabbitMQJamesServerTest.java +++ b/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/CassandraRabbitMQJamesServerTest.java @@ -92,7 +92,7 @@ class CassandraRabbitMQJamesServerTest { .overrideWith(JmapJamesServerContract.DOMAIN_LIST_CONFIGURATION_MODULE); @Nested - @TestInstance(Lifecycle.PER_CLASS) + @TestInstance(Lifecycle.PER_METHOD) class WithEncryptedSwift implements ContractSuite { @RegisterExtension JamesServerExtension testExtension = baseExtensionBuilder() @@ -111,7 +111,7 @@ class CassandraRabbitMQJamesServerTest { } @Nested - @TestInstance(Lifecycle.PER_CLASS) + @TestInstance(Lifecycle.PER_METHOD) class WithDefaultSwift implements ContractSuite { @RegisterExtension JamesServerExtension testExtension = baseExtensionBuilder() @@ -129,7 +129,7 @@ class CassandraRabbitMQJamesServerTest { } @Nested - @TestInstance(Lifecycle.PER_CLASS) + @TestInstance(Lifecycle.PER_METHOD) class WithoutSwift implements ContractSuite { @RegisterExtension JamesServerExtension testExtension = baseExtensionBuilder().build(); http://git-wip-us.apache.org/repos/asf/james-project/blob/97fa9683/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java index 76e9838..104fed9 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java @@ -22,9 +22,7 @@ package org.apache.james.queue.rabbitmq; import static org.apache.james.queue.api.MailQueue.DEQUEUED_METRIC_NAME_PREFIX; import java.io.IOException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; +import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Consumer; import java.util.function.Function; @@ -33,17 +31,17 @@ import org.apache.james.metrics.api.MetricFactory; import org.apache.james.queue.api.MailQueue; import org.apache.james.queue.rabbitmq.view.api.DeleteCondition; import org.apache.james.queue.rabbitmq.view.api.MailQueueView; -import org.apache.james.util.concurrent.NamedThreadFactory; import org.apache.mailet.Mail; import com.github.fge.lambdas.Throwing; import com.github.fge.lambdas.consumers.ThrowingConsumer; -import com.nurkiewicz.asyncretry.AsyncRetryExecutor; -import com.rabbitmq.client.GetResponse; +import com.rabbitmq.client.Delivery; +import reactor.rabbitmq.AcknowledgableDelivery; class Dequeuer { - private static class NoMailYetException extends RuntimeException { - } + + private static final boolean REQUEUE = true; + private final LinkedBlockingQueue<AcknowledgableDelivery> messages; private static class RabbitMQMailQueueItem implements MailQueue.MailQueueItem { private final Consumer<Boolean> ack; @@ -65,10 +63,6 @@ class Dequeuer { } } - private static final int TEN_MS = 10; - - private final MailQueueName name; - private final RabbitClient rabbitClient; private final Function<MailReferenceDTO, Mail> mailLoader; private final Metric dequeueMetric; private final MailReferenceSerializer mailReferenceSerializer; @@ -77,48 +71,51 @@ class Dequeuer { Dequeuer(MailQueueName name, RabbitClient rabbitClient, Function<MailReferenceDTO, Mail> mailLoader, MailReferenceSerializer serializer, MetricFactory metricFactory, MailQueueView mailQueueView) { - this.name = name; - this.rabbitClient = rabbitClient; this.mailLoader = mailLoader; this.mailReferenceSerializer = serializer; this.mailQueueView = mailQueueView; this.dequeueMetric = metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + name.asString()); + this.messages = messageIterator(name, rabbitClient); + } + + private LinkedBlockingQueue<AcknowledgableDelivery> messageIterator(MailQueueName name, RabbitClient rabbitClient) { + LinkedBlockingQueue<AcknowledgableDelivery> dequeue = new LinkedBlockingQueue<>(1); + rabbitClient + .receive(name) + .filter(getResponse -> getResponse.getBody() != null) + .doOnNext(Throwing.consumer(dequeue::put)) + .subscribe(); + return dequeue; } - MailQueue.MailQueueItem deQueue() { - return pollChannel() - .thenApply(Throwing.function(this::loadItem).sneakyThrow()) - .join(); + MailQueue.MailQueueItem deQueue() throws MailQueue.MailQueueException, InterruptedException { + return loadItem(messages.take()); } - private RabbitMQMailQueueItem loadItem(GetResponse response) throws MailQueue.MailQueueException { + private RabbitMQMailQueueItem loadItem(AcknowledgableDelivery response) throws MailQueue.MailQueueException { Mail mail = loadMail(response); - ThrowingConsumer<Boolean> ack = ack(response.getEnvelope().getDeliveryTag(), mail); + ThrowingConsumer<Boolean> ack = ack(response, response.getEnvelope().getDeliveryTag(), mail); return new RabbitMQMailQueueItem(ack, mail); } - private ThrowingConsumer<Boolean> ack(long deliveryTag, Mail mail) { + private ThrowingConsumer<Boolean> ack(AcknowledgableDelivery response, long deliveryTag, Mail mail) { return success -> { - try { - if (success) { - dequeueMetric.increment(); - rabbitClient.ack(deliveryTag); - mailQueueView.delete(DeleteCondition.withName(mail.getName())); - } else { - rabbitClient.nack(deliveryTag); - } - } catch (IOException e) { - throw new MailQueue.MailQueueException("Failed to ACK " + mail.getName() + " with delivery tag " + deliveryTag, e); + if (success) { + dequeueMetric.increment(); + response.ack(); + mailQueueView.delete(DeleteCondition.withName(mail.getName())); + } else { + response.nack(REQUEUE); } }; } - private Mail loadMail(GetResponse response) throws MailQueue.MailQueueException { + private Mail loadMail(Delivery response) throws MailQueue.MailQueueException { MailReferenceDTO mailDTO = toMailReference(response); return mailLoader.apply(mailDTO); } - private MailReferenceDTO toMailReference(GetResponse getResponse) throws MailQueue.MailQueueException { + private MailReferenceDTO toMailReference(Delivery getResponse) throws MailQueue.MailQueueException { try { return mailReferenceSerializer.read(getResponse.getBody()); } catch (IOException e) { @@ -126,19 +123,4 @@ class Dequeuer { } } - private CompletableFuture<GetResponse> pollChannel() { - ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass()); - return new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor(threadFactory)) - .withFixedRate() - .withMinDelay(TEN_MS) - .retryOn(NoMailYetException.class) - .getWithRetry(this::singleChannelRead); - } - - private GetResponse singleChannelRead() throws IOException { - return rabbitClient.poll(name) - .filter(getResponse -> getResponse.getBody() != null) - .orElseThrow(NoMailYetException::new); - } - } http://git-wip-us.apache.org/repos/asf/james-project/blob/97fa9683/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java index d6d0618..4a19a8d 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java @@ -19,17 +19,13 @@ package org.apache.james.queue.rabbitmq; -import static org.apache.james.backend.rabbitmq.Constants.AUTO_ACK; import static org.apache.james.backend.rabbitmq.Constants.AUTO_DELETE; import static org.apache.james.backend.rabbitmq.Constants.DURABLE; import static org.apache.james.backend.rabbitmq.Constants.EMPTY_ROUTING_KEY; import static org.apache.james.backend.rabbitmq.Constants.EXCLUSIVE; -import static org.apache.james.backend.rabbitmq.Constants.MULTIPLE; import static org.apache.james.backend.rabbitmq.Constants.NO_ARGUMENTS; -import static org.apache.james.backend.rabbitmq.Constants.REQUEUE; import java.io.IOException; -import java.util.Optional; import javax.inject.Inject; @@ -37,7 +33,8 @@ import org.apache.james.backend.rabbitmq.RabbitMQChannelPool; import org.apache.james.queue.api.MailQueue; import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.GetResponse; +import reactor.core.publisher.Flux; +import reactor.rabbitmq.AcknowledgableDelivery; class RabbitClient { private final RabbitMQChannelPool channelPool; @@ -69,19 +66,7 @@ class RabbitClient { }); } - void ack(long deliveryTag) throws IOException { - RabbitMQChannelPool.RabbitConsumer<IOException> consumer = channel -> channel.basicAck(deliveryTag, !MULTIPLE); - channelPool.execute(consumer); - } - - void nack(long deliveryTag) throws IOException { - RabbitMQChannelPool.RabbitConsumer<IOException> consumer = channel -> channel.basicNack(deliveryTag, !MULTIPLE, REQUEUE); - channelPool.execute(consumer); - } - - Optional<GetResponse> poll(MailQueueName name) throws IOException { - RabbitMQChannelPool.RabbitFunction<Optional<GetResponse>, IOException> f = channel -> - Optional.ofNullable(channel.basicGet(name.toWorkQueueName().asString(), !AUTO_ACK)); - return channelPool.execute(f); + Flux<AcknowledgableDelivery> receive(MailQueueName name) { + return channelPool.receive(name.toWorkQueueName().asString()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org