This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 9f0a943fd2d07ae9d0b26a35ce1293cd0f325efc Author: Benoit Tellier <[email protected]> AuthorDate: Fri Jul 3 09:31:13 2020 +0700 JAMES-3290 Many invalid JSON should not abort dequeue --- .../org/apache/james/queue/rabbitmq/Dequeuer.java | 14 ++++---- .../queue/rabbitmq/RabbitMQMailQueueTest.java | 39 ++++++++++++++++++++++ 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java index bacd1ff..fe209f9 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java @@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory; import com.github.fge.lambdas.Throwing; import com.github.fge.lambdas.consumers.ThrowingConsumer; -import com.rabbitmq.client.Delivery; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -139,26 +138,27 @@ class Dequeuer implements Closeable { }; } - private Mono<MailWithEnqueueId> loadMail(AcknowledgableDelivery response) { - return toMailReference(response) + private Mono<MailWithEnqueueId> loadMail(AcknowledgableDelivery delivery) { + return toMailReference(delivery) .flatMap(reference -> mailLoader.load(reference) .onErrorResume(ObjectNotFoundException.class, e -> { LOGGER.error("Fail to load mail {} with enqueueId {} as underlying blobs do not exist. Discarding this message to prevent an infinite loop.", reference.getName(), reference.getEnqueueId(), e); - response.nack(!REQUEUE); + delivery.nack(!REQUEUE); return Mono.empty(); }) .onErrorResume(e -> { LOGGER.error("Fail to load mail {} with enqueueId {}", reference.getName(), reference.getEnqueueId(), e); - response.nack(REQUEUE); + delivery.nack(REQUEUE); return Mono.empty(); })); } - private Mono<MailReferenceDTO> toMailReference(Delivery getResponse) { - return Mono.fromCallable(getResponse::getBody) + private Mono<MailReferenceDTO> toMailReference(AcknowledgableDelivery delivery) { + return Mono.fromCallable(delivery::getBody) .map(Throwing.function(mailReferenceSerializer::read).sneakyThrow()) .onErrorResume(e -> { LOGGER.error("Fail to deserialize MailReferenceDTO. Discarding this message to prevent an infinite loop.", e); + delivery.nack(!REQUEUE); return Mono.empty(); }); } diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index abae166..0289266 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -418,6 +418,45 @@ class RabbitMQMailQueueTest { .untilAsserted(() -> assertThat(dequeuedMailNames) .containsExactly(name1, name2, name3)); } + + @Test + void manyInvalidMessagesShouldNotAbortProcessing() throws Exception { + String name1 = "myMail1"; + String name2 = "myMail2"; + String name3 = "myMail3"; + + String emptyRoutingKey = ""; + + IntStream.range(0, 100) + .forEach(i -> rabbitMQExtension.getSender() + .send(Mono.just(new OutboundMessage("JamesMailQueue-exchange-spool", + emptyRoutingKey, + ("BAD_PAYLOAD " + i).getBytes(StandardCharsets.UTF_8)))) + .block()); + + getMailQueue().enQueue(defaultMail() + .name(name1) + .build()); + + getMailQueue().enQueue(defaultMail() + .name(name2) + .build()); + + getMailQueue().enQueue(defaultMail() + .name(name3) + .build()); + + ConcurrentLinkedDeque<String> dequeuedMailNames = new ConcurrentLinkedDeque<>(); + + Flux.from(getMailQueue().deQueue()) + .doOnNext(item -> dequeuedMailNames.add(item.getMail().getName())) + .doOnNext(Throwing.consumer(item -> item.done(true))) + .subscribe(); + + Awaitility.await().atMost(org.awaitility.Duration.TEN_SECONDS) + .untilAsserted(() -> assertThat(dequeuedMailNames) + .containsExactly(name1, name2, name3)); + } } @Nested --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
