This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit b6d0467bcf1642f1fec8f799615099e9a7e36eb2 Author: Benoit Tellier <[email protected]> AuthorDate: Wed Jul 1 16:29:06 2020 +0700 JAMES-3291 Badly formatted mailqueue causes RabbitMQMailQueue to crash --- .../queue/rabbitmq/RabbitMQMailQueueTest.java | 41 ++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index ee32d8a..4b2bd9b 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -30,6 +30,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.Instant; import java.util.List; @@ -63,6 +64,7 @@ import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMai import org.apache.james.util.streams.Iterators; import org.apache.james.utils.UpdatableTickingClock; import org.apache.mailet.Mail; +import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; @@ -76,6 +78,7 @@ import com.github.fge.lambdas.Throwing; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; +import reactor.rabbitmq.OutboundMessage; class RabbitMQMailQueueTest { private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory(); @@ -378,6 +381,44 @@ class RabbitMQMailQueueTest { // We expect content missing blob references to be purged from the queue assertThat(dequeuedNames).isEmpty(); } + + @Disabled("JAMES-3291 Badly formatted mailqueue causes RabbitMQMailQueue to crash") + @Test + void dequeueShouldNotAbortProcessingUponSerializationIssuesErrors() throws Exception { + String name1 = "myMail1"; + String name2 = "myMail2"; + String name3 = "myMail3"; + + String emptyRoutingKey = ""; + rabbitMQExtension.getSender() + .send(Mono.just(new OutboundMessage("JamesMailQueue-exchange-spool", + emptyRoutingKey, + "BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8)))) + .block(); + + getMailQueue().enQueue(defaultMail() + .name(name1) + .build()); + + getMailQueue().enQueue(defaultMail() + .name(name2) + .build()); + + getMailQueue().enQueue(defaultMail() + .name(name3) + .build()); + + ConcurrentLinkedDeque<String> dequeuedMailNames = new ConcurrentLinkedDeque<>(); + + Flux.from(getMailQueue().deQueue()) + .doOnNext(item -> dequeuedMailNames.add(item.getMail().getName())) + .doOnNext(Throwing.consumer(item -> item.done(true))) + .subscribe(); + + Awaitility.await().atMost(org.awaitility.Duration.TEN_SECONDS) + .untilAsserted(() -> assertThat(dequeuedMailNames) + .containsExactly(name1, name2, name3)); + } } @Nested --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
