This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch 3.7.x
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/3.7.x by this push:
     new f44d1efc37 JAMES-3919 RabbitMQMailQueue: clean up cassandra projection 
when we c… (#1609) (#1616)
f44d1efc37 is described below

commit f44d1efc37c26cfabfdc9dfe482698149d5caa78
Author: Benoit TELLIER <btell...@linagora.com>
AuthorDate: Wed Jul 5 08:46:09 2023 +0700

    JAMES-3919 RabbitMQMailQueue: clean up cassandra projection when we c… 
(#1609) (#1616)
---
 .../backends/rabbitmq/RabbitMQManagementAPI.java   |  3 ++
 .../org/apache/james/queue/rabbitmq/Dequeuer.java  |  2 +-
 .../org/apache/james/queue/rabbitmq/Enqueuer.java  | 13 ++++-
 .../james/queue/rabbitmq/RabbitMQMailQueue.java    |  4 +-
 .../queue/rabbitmq/view/api/MailQueueView.java     |  3 +-
 .../view/cassandra/CassandraMailQueueView.java     | 12 ++---
 .../queue/rabbitmq/RabbitMQMailQueueTest.java      | 58 ++++++++++++++--------
 7 files changed, 61 insertions(+), 34 deletions(-)

diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQManagementAPI.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQManagementAPI.java
index 5661a4d314..826925ef1e 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQManagementAPI.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQManagementAPI.java
@@ -441,6 +441,9 @@ public interface RabbitMQManagementAPI {
     @RequestLine(value = "DELETE /api/queues/{vhost}/{name}", decodeSlash = 
false)
     void deleteQueue(@Param("vhost") String vhost, @Param("name") String name);
 
+    @RequestLine(value = "DELETE /api/queues/{vhost}/{name}/contents", 
decodeSlash = false)
+    void purgeQueue(@Param("vhost") String vhost, @Param("name") String name);
+
     @RequestLine(value = "GET /api/exchanges/{vhost}/{name}/bindings/source", 
decodeSlash = false)
     List<BindingSource> listBindings(@Param("vhost") String vhost, 
@Param("name") String name);
 
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index 573b7a2497..671400a06d 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -140,7 +140,7 @@ class Dequeuer {
             if (success) {
                 dequeueMetric.increment();
                 response.ack();
-                
mailQueueView.delete(DeleteCondition.withEnqueueId(mailWithEnqueueId.getEnqueueId(),
 mailWithEnqueueId.getBlobIds()));
+                
Mono.from(mailQueueView.delete(DeleteCondition.withEnqueueId(mailWithEnqueueId.getEnqueueId(),
 mailWithEnqueueId.getBlobIds()))).block();
             } else {
                 response.nack(REQUEUE);
             }
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
index b72139c942..b6d97b74fc 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
@@ -25,6 +25,7 @@ import static 
org.apache.james.queue.api.MailQueue.ENQUEUED_METRIC_NAME_PREFIX;
 
 import java.time.Clock;
 import java.time.Duration;
+import java.util.function.Function;
 
 import javax.mail.MessagingException;
 import javax.mail.internet.MimeMessage;
@@ -34,6 +35,7 @@ import org.apache.james.blob.mail.MimeMessagePartsId;
 import org.apache.james.metrics.api.Metric;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.queue.api.MailQueue;
+import org.apache.james.queue.rabbitmq.view.api.DeleteCondition;
 import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
 import 
org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueBrowser;
 import org.apache.mailet.Mail;
@@ -79,11 +81,20 @@ class Enqueuer {
                 return Flux.mergeDelayError(2,
                         mailQueueView.storeMail(enqueuedItem),
                         publishReferenceToRabbit(mailReference))
-                        .then();
+                        .then()
+                        .onErrorResume(cleanupMailQueueView(enqueueId, 
mailReference));
             }).sneakyThrow())
             .thenEmpty(Mono.fromRunnable(enqueueMetric::increment));
     }
 
+    private Function<Throwable, Mono<Void>> cleanupMailQueueView(EnqueueId 
enqueueId, MailReference mailReference) {
+        return (Throwable e) -> {
+            DeleteCondition.WithEnqueueId deleteCondition = 
DeleteCondition.withEnqueueId(enqueueId, mailReference.getPartsId());
+            return Mono.from(mailQueueView.delete(deleteCondition))
+                    .thenReturn(Mono.<Void>error(e));
+        };
+    }
+
     Mono<Void> reQueue(CassandraMailQueueBrowser.CassandraMailQueueItemView 
item) {
         Mail mail = item.getMail();
         return Mono.fromCallable(() -> new MailReference(item.getEnqueuedId(), 
mail, item.getEnqueuedPartsId()))
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
index 602e679c33..f311d7bd9a 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
@@ -116,12 +116,12 @@ public class RabbitMQMailQueue implements 
ManageableMailQueue {
 
     @Override
     public long clear() {
-        return mailQueueView.delete(DeleteCondition.all());
+        return Mono.from(mailQueueView.delete(DeleteCondition.all())).block();
     }
 
     @Override
     public long remove(Type type, String value) {
-        return mailQueueView.delete(DeleteCondition.from(type, value));
+        return Mono.from(mailQueueView.delete(DeleteCondition.from(type, 
value))).block();
     }
 
     @Override
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
index 921a08bdcd..7afcf99fc7 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
@@ -25,6 +25,7 @@ import org.apache.james.queue.api.ManageableMailQueue;
 import org.apache.james.queue.rabbitmq.EnqueueId;
 import org.apache.james.queue.rabbitmq.EnqueuedItem;
 import org.apache.james.queue.rabbitmq.MailQueueName;
+import org.reactivestreams.Publisher;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -39,7 +40,7 @@ public interface MailQueueView<V extends 
ManageableMailQueue.MailQueueItemView>
 
     Mono<Void> storeMail(EnqueuedItem enqueuedItem);
 
-    long delete(DeleteCondition deleteCondition);
+    Publisher<Long> delete(DeleteCondition deleteCondition);
 
     Mono<Boolean> isPresent(EnqueueId id);
 
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
index e76df88fa5..83270db79b 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
@@ -127,25 +127,23 @@ public class CassandraMailQueueView implements 
MailQueueView<CassandraMailQueueB
     }
 
     @Override
-    public long delete(DeleteCondition deleteCondition) {
+    public Mono<Long> delete(DeleteCondition deleteCondition) {
         if (deleteCondition instanceof DeleteCondition.WithEnqueueId) {
             DeleteCondition.WithEnqueueId enqueueIdCondition = 
(DeleteCondition.WithEnqueueId) deleteCondition;
-            delete(enqueueIdCondition.getEnqueueId(), 
enqueueIdCondition.getBlobIds()).block();
-            return 1L;
+            return delete(enqueueIdCondition.getEnqueueId(), 
enqueueIdCondition.getBlobIds())
+                .thenReturn(1L);
         }
         return browseThenDelete(deleteCondition);
     }
 
-    private long browseThenDelete(DeleteCondition deleteCondition) {
+    private Mono<Long> browseThenDelete(DeleteCondition deleteCondition) {
         return cassandraMailQueueBrowser.browseReferences(mailQueueName)
             .map(EnqueuedItemWithSlicingContext::getEnqueuedItem)
             .filter(deleteCondition::shouldBeDeleted)
             .flatMap(mailReference -> 
cassandraMailQueueMailDelete.considerDeleted(mailReference.getEnqueueId(), 
mailQueueName)
                 
.then(Mono.from(mimeMessageStore.delete(mailReference.getPartsId()))), 
DELETION_CONCURRENCY)
             .count()
-            .doOnNext(ignored -> 
cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName))
-            .subscribeOn(Schedulers.elastic())
-            .block();
+            .doOnNext(ignored -> 
cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName));
     }
 
     private Mono<Void> delete(EnqueueId enqueueId,
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index f291b1f46d..8fa34fa0d2 100644
--- 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -31,6 +31,7 @@ import static 
org.apache.mailet.base.MailAddressFixture.RECIPIENT1;
 import static org.apache.mailet.base.MailAddressFixture.SENDER;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.awaitility.Awaitility.await;
 import static org.awaitility.Durations.TEN_SECONDS;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.never;
@@ -40,6 +41,7 @@ import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -91,6 +93,7 @@ import org.mockito.ArgumentCaptor;
 
 import com.github.fge.lambdas.Throwing;
 
+import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
@@ -376,7 +379,12 @@ class RabbitMQMailQueueTest {
             String name1 = "myMail1";
             String name2 = "myMail2";
             String name3 = "myMail3";
-            Flux<MailQueue.MailQueueItem> dequeueFlux = 
Flux.from(getMailQueue().deQueue());
+
+            List<MailQueue.MailQueueItem> receivedItem = new ArrayList<>();
+            Flux.from(getMailQueue().deQueue())
+                    .doOnNext(receivedItem::add)
+                    .subscribe();
+
             getMailQueue().enQueue(defaultMail()
                 .name(name1)
                 .build());
@@ -399,19 +407,16 @@ class RabbitMQMailQueueTest {
                 .name(name3)
                 .build());
 
-            List<MailQueue.MailQueueItem> items = 
dequeueFlux.take(3).collectList().block(Duration.ofSeconds(10));
-
-            assertThat(items)
-                .extracting(item -> item.getMail().getName())
-                .contains(name1, name3);
+            await().atMost(Duration.ofSeconds(10))
+                    .untilAsserted(() -> assertThat(receivedItem)
+                            .extracting(item -> item.getMail().getName())
+                            .contains(name1, name3));
         }
 
         @Test
         void enqueuedEmailsShouldNotBeLostDuringRabbitMQOutages() throws 
Exception {
             String name = "myMail";
 
-            rabbitMQExtension.getRabbitMQ().pause();
-            Thread.sleep(2000);
 
             try {
                 getMailQueue().enQueue(defaultMail()
@@ -420,8 +425,7 @@ class RabbitMQMailQueueTest {
             } catch (Exception e) {
                 // Ignore
             }
-            rabbitMQExtension.getRabbitMQ().unpause();
-            Thread.sleep(100);
+            rabbitMQExtension.managementAPI().purgeQueue("/", 
"JamesMailQueue-workqueue-spool");
 
             getMailQueue().republishNotProcessedMails(clock.instant().plus(30, 
ChronoUnit.MINUTES)).blockLast();
 
@@ -642,14 +646,25 @@ class RabbitMQMailQueueTest {
         }
 
         private void dequeueMails(int times) {
-            Flux.from(getManageableMailQueue()
-                .deQueue())
-                .take(times)
-                .flatMap(mailQueueItem -> Mono.fromCallable(() -> {
-                    mailQueueItem.done(true);
-                    return mailQueueItem;
-                }))
-                .blockLast();
+            AtomicInteger counter = new AtomicInteger(0);
+            Disposable disposable = Flux.from(getManageableMailQueue()
+                            .deQueue())
+                    .concatMap(mailQueueItem -> Mono.fromCallable(() -> {
+                        if (counter.getAndIncrement() < times) {
+                            mailQueueItem.done(true);
+                            return mailQueueItem;
+                        } else {
+                            mailQueueItem.done(false);
+                            return null;
+                        }
+                    }).subscribeOn(Schedulers.elastic()))
+                    .subscribe();
+
+            try {
+                await().untilAsserted(() -> 
assertThat(counter.get()).isGreaterThanOrEqualTo(times));
+            } finally {
+                disposable.dispose();
+            }
         }
 
         @Test
@@ -761,7 +776,7 @@ class RabbitMQMailQueueTest {
                 .doOnNext(Throwing.consumer(item -> item.done(true)))
                 .subscribe();
 
-            Awaitility.await().atMost(TEN_SECONDS)
+            await().atMost(TEN_SECONDS)
                 .untilAsserted(() -> assertThat(dequeuedMailNames)
                     .containsExactly(name1, name2, name3));
         }
@@ -800,7 +815,7 @@ class RabbitMQMailQueueTest {
                 .doOnNext(Throwing.consumer(item -> item.done(true)))
                 .subscribe();
 
-            Awaitility.await().atMost(TEN_SECONDS)
+            await().atMost(TEN_SECONDS)
                 .untilAsserted(() -> assertThat(dequeuedMailNames)
                     .containsExactly(name1, name2, name3));
         }
@@ -828,10 +843,9 @@ class RabbitMQMailQueueTest {
                 .subscribe();
 
 
-            Awaitility.await().atMost(TEN_SECONDS)
+            await().atMost(TEN_SECONDS)
                 .untilAsserted(() -> 
assertThat(deadLetteredCount.get()).isEqualTo(1));
         }
-
         private void resumeDequeuing(Sender sender) {
             sender.bindQueue(getMailQueueBindingSpecification()).block();
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to