This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch 3.7.x in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/3.7.x by this push: new f44d1efc37 JAMES-3919 RabbitMQMailQueue: clean up cassandra projection when we c… (#1609) (#1616) f44d1efc37 is described below commit f44d1efc37c26cfabfdc9dfe482698149d5caa78 Author: Benoit TELLIER <btell...@linagora.com> AuthorDate: Wed Jul 5 08:46:09 2023 +0700 JAMES-3919 RabbitMQMailQueue: clean up cassandra projection when we c… (#1609) (#1616) --- .../backends/rabbitmq/RabbitMQManagementAPI.java | 3 ++ .../org/apache/james/queue/rabbitmq/Dequeuer.java | 2 +- .../org/apache/james/queue/rabbitmq/Enqueuer.java | 13 ++++- .../james/queue/rabbitmq/RabbitMQMailQueue.java | 4 +- .../queue/rabbitmq/view/api/MailQueueView.java | 3 +- .../view/cassandra/CassandraMailQueueView.java | 12 ++--- .../queue/rabbitmq/RabbitMQMailQueueTest.java | 58 ++++++++++++++-------- 7 files changed, 61 insertions(+), 34 deletions(-) diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQManagementAPI.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQManagementAPI.java index 5661a4d314..826925ef1e 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQManagementAPI.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQManagementAPI.java @@ -441,6 +441,9 @@ public interface RabbitMQManagementAPI { @RequestLine(value = "DELETE /api/queues/{vhost}/{name}", decodeSlash = false) void deleteQueue(@Param("vhost") String vhost, @Param("name") String name); + @RequestLine(value = "DELETE /api/queues/{vhost}/{name}/contents", decodeSlash = false) + void purgeQueue(@Param("vhost") String vhost, @Param("name") String name); + @RequestLine(value = "GET /api/exchanges/{vhost}/{name}/bindings/source", decodeSlash = false) List<BindingSource> listBindings(@Param("vhost") String vhost, @Param("name") String name); diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java index 573b7a2497..671400a06d 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java @@ -140,7 +140,7 @@ class Dequeuer { if (success) { dequeueMetric.increment(); response.ack(); - mailQueueView.delete(DeleteCondition.withEnqueueId(mailWithEnqueueId.getEnqueueId(), mailWithEnqueueId.getBlobIds())); + Mono.from(mailQueueView.delete(DeleteCondition.withEnqueueId(mailWithEnqueueId.getEnqueueId(), mailWithEnqueueId.getBlobIds()))).block(); } else { response.nack(REQUEUE); } diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java index b72139c942..b6d97b74fc 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java @@ -25,6 +25,7 @@ import static org.apache.james.queue.api.MailQueue.ENQUEUED_METRIC_NAME_PREFIX; import java.time.Clock; import java.time.Duration; +import java.util.function.Function; import javax.mail.MessagingException; import javax.mail.internet.MimeMessage; @@ -34,6 +35,7 @@ import org.apache.james.blob.mail.MimeMessagePartsId; import org.apache.james.metrics.api.Metric; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.queue.api.MailQueue; +import org.apache.james.queue.rabbitmq.view.api.DeleteCondition; import org.apache.james.queue.rabbitmq.view.api.MailQueueView; import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueBrowser; import org.apache.mailet.Mail; @@ -79,11 +81,20 @@ class Enqueuer { return Flux.mergeDelayError(2, mailQueueView.storeMail(enqueuedItem), publishReferenceToRabbit(mailReference)) - .then(); + .then() + .onErrorResume(cleanupMailQueueView(enqueueId, mailReference)); }).sneakyThrow()) .thenEmpty(Mono.fromRunnable(enqueueMetric::increment)); } + private Function<Throwable, Mono<Void>> cleanupMailQueueView(EnqueueId enqueueId, MailReference mailReference) { + return (Throwable e) -> { + DeleteCondition.WithEnqueueId deleteCondition = DeleteCondition.withEnqueueId(enqueueId, mailReference.getPartsId()); + return Mono.from(mailQueueView.delete(deleteCondition)) + .thenReturn(Mono.<Void>error(e)); + }; + } + Mono<Void> reQueue(CassandraMailQueueBrowser.CassandraMailQueueItemView item) { Mail mail = item.getMail(); return Mono.fromCallable(() -> new MailReference(item.getEnqueuedId(), mail, item.getEnqueuedPartsId())) diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java index 602e679c33..f311d7bd9a 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java @@ -116,12 +116,12 @@ public class RabbitMQMailQueue implements ManageableMailQueue { @Override public long clear() { - return mailQueueView.delete(DeleteCondition.all()); + return Mono.from(mailQueueView.delete(DeleteCondition.all())).block(); } @Override public long remove(Type type, String value) { - return mailQueueView.delete(DeleteCondition.from(type, value)); + return Mono.from(mailQueueView.delete(DeleteCondition.from(type, value))).block(); } @Override diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java index 921a08bdcd..7afcf99fc7 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java @@ -25,6 +25,7 @@ import org.apache.james.queue.api.ManageableMailQueue; import org.apache.james.queue.rabbitmq.EnqueueId; import org.apache.james.queue.rabbitmq.EnqueuedItem; import org.apache.james.queue.rabbitmq.MailQueueName; +import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -39,7 +40,7 @@ public interface MailQueueView<V extends ManageableMailQueue.MailQueueItemView> Mono<Void> storeMail(EnqueuedItem enqueuedItem); - long delete(DeleteCondition deleteCondition); + Publisher<Long> delete(DeleteCondition deleteCondition); Mono<Boolean> isPresent(EnqueueId id); diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java index e76df88fa5..83270db79b 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java @@ -127,25 +127,23 @@ public class CassandraMailQueueView implements MailQueueView<CassandraMailQueueB } @Override - public long delete(DeleteCondition deleteCondition) { + public Mono<Long> delete(DeleteCondition deleteCondition) { if (deleteCondition instanceof DeleteCondition.WithEnqueueId) { DeleteCondition.WithEnqueueId enqueueIdCondition = (DeleteCondition.WithEnqueueId) deleteCondition; - delete(enqueueIdCondition.getEnqueueId(), enqueueIdCondition.getBlobIds()).block(); - return 1L; + return delete(enqueueIdCondition.getEnqueueId(), enqueueIdCondition.getBlobIds()) + .thenReturn(1L); } return browseThenDelete(deleteCondition); } - private long browseThenDelete(DeleteCondition deleteCondition) { + private Mono<Long> browseThenDelete(DeleteCondition deleteCondition) { return cassandraMailQueueBrowser.browseReferences(mailQueueName) .map(EnqueuedItemWithSlicingContext::getEnqueuedItem) .filter(deleteCondition::shouldBeDeleted) .flatMap(mailReference -> cassandraMailQueueMailDelete.considerDeleted(mailReference.getEnqueueId(), mailQueueName) .then(Mono.from(mimeMessageStore.delete(mailReference.getPartsId()))), DELETION_CONCURRENCY) .count() - .doOnNext(ignored -> cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName)) - .subscribeOn(Schedulers.elastic()) - .block(); + .doOnNext(ignored -> cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName)); } private Mono<Void> delete(EnqueueId enqueueId, diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index f291b1f46d..8fa34fa0d2 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -31,6 +31,7 @@ import static org.apache.mailet.base.MailAddressFixture.RECIPIENT1; import static org.apache.mailet.base.MailAddressFixture.SENDER; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; +import static org.awaitility.Awaitility.await; import static org.awaitility.Durations.TEN_SECONDS; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.never; @@ -40,6 +41,7 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicInteger; @@ -91,6 +93,7 @@ import org.mockito.ArgumentCaptor; import com.github.fge.lambdas.Throwing; +import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -376,7 +379,12 @@ class RabbitMQMailQueueTest { String name1 = "myMail1"; String name2 = "myMail2"; String name3 = "myMail3"; - Flux<MailQueue.MailQueueItem> dequeueFlux = Flux.from(getMailQueue().deQueue()); + + List<MailQueue.MailQueueItem> receivedItem = new ArrayList<>(); + Flux.from(getMailQueue().deQueue()) + .doOnNext(receivedItem::add) + .subscribe(); + getMailQueue().enQueue(defaultMail() .name(name1) .build()); @@ -399,19 +407,16 @@ class RabbitMQMailQueueTest { .name(name3) .build()); - List<MailQueue.MailQueueItem> items = dequeueFlux.take(3).collectList().block(Duration.ofSeconds(10)); - - assertThat(items) - .extracting(item -> item.getMail().getName()) - .contains(name1, name3); + await().atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> assertThat(receivedItem) + .extracting(item -> item.getMail().getName()) + .contains(name1, name3)); } @Test void enqueuedEmailsShouldNotBeLostDuringRabbitMQOutages() throws Exception { String name = "myMail"; - rabbitMQExtension.getRabbitMQ().pause(); - Thread.sleep(2000); try { getMailQueue().enQueue(defaultMail() @@ -420,8 +425,7 @@ class RabbitMQMailQueueTest { } catch (Exception e) { // Ignore } - rabbitMQExtension.getRabbitMQ().unpause(); - Thread.sleep(100); + rabbitMQExtension.managementAPI().purgeQueue("/", "JamesMailQueue-workqueue-spool"); getMailQueue().republishNotProcessedMails(clock.instant().plus(30, ChronoUnit.MINUTES)).blockLast(); @@ -642,14 +646,25 @@ class RabbitMQMailQueueTest { } private void dequeueMails(int times) { - Flux.from(getManageableMailQueue() - .deQueue()) - .take(times) - .flatMap(mailQueueItem -> Mono.fromCallable(() -> { - mailQueueItem.done(true); - return mailQueueItem; - })) - .blockLast(); + AtomicInteger counter = new AtomicInteger(0); + Disposable disposable = Flux.from(getManageableMailQueue() + .deQueue()) + .concatMap(mailQueueItem -> Mono.fromCallable(() -> { + if (counter.getAndIncrement() < times) { + mailQueueItem.done(true); + return mailQueueItem; + } else { + mailQueueItem.done(false); + return null; + } + }).subscribeOn(Schedulers.elastic())) + .subscribe(); + + try { + await().untilAsserted(() -> assertThat(counter.get()).isGreaterThanOrEqualTo(times)); + } finally { + disposable.dispose(); + } } @Test @@ -761,7 +776,7 @@ class RabbitMQMailQueueTest { .doOnNext(Throwing.consumer(item -> item.done(true))) .subscribe(); - Awaitility.await().atMost(TEN_SECONDS) + await().atMost(TEN_SECONDS) .untilAsserted(() -> assertThat(dequeuedMailNames) .containsExactly(name1, name2, name3)); } @@ -800,7 +815,7 @@ class RabbitMQMailQueueTest { .doOnNext(Throwing.consumer(item -> item.done(true))) .subscribe(); - Awaitility.await().atMost(TEN_SECONDS) + await().atMost(TEN_SECONDS) .untilAsserted(() -> assertThat(dequeuedMailNames) .containsExactly(name1, name2, name3)); } @@ -828,10 +843,9 @@ class RabbitMQMailQueueTest { .subscribe(); - Awaitility.await().atMost(TEN_SECONDS) + await().atMost(TEN_SECONDS) .untilAsserted(() -> assertThat(deadLetteredCount.get()).isEqualTo(1)); } - private void resumeDequeuing(Sender sender) { sender.bindQueue(getMailQueueBindingSpecification()).block(); } --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org