This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 1de4fa58380d288852b11d02fd6a6291ec38a665
Author: Rene Cordier <[email protected]>
AuthorDate: Fri Jul 10 11:25:01 2020 +0700

    JAMES-3305 Invalid EventBus messages needs to be stored in dead letter queue
---
 .../james/mailbox/events/EventDispatcher.java      | 28 +++++++++++++++++--
 .../james/mailbox/events/GroupRegistration.java    |  5 ++--
 .../james/mailbox/events/RabbitMQEventBus.java     |  2 ++
 .../james/mailbox/events/RabbitMQEventBusTest.java | 32 +++++++++++++++++++++-
 4 files changed, 61 insertions(+), 6 deletions(-)

diff --git 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
index c150184..999dc40 100644
--- 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
+++ 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
@@ -20,9 +20,15 @@
 package org.apache.james.mailbox.events;
 
 import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN;
+import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
 import static org.apache.james.backends.rabbitmq.Constants.DIRECT_EXCHANGE;
 import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
+import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
+import static org.apache.james.backends.rabbitmq.Constants.NO_ARGUMENTS;
 import static org.apache.james.mailbox.events.RabbitMQEventBus.EVENT_BUS_ID;
+import static 
org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_DEAD_LETTER_EXCHANGE_NAME;
+import static 
org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_DEAD_LETTER_QUEUE;
 import static 
org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
 
 import java.nio.charset.StandardCharsets;
@@ -46,8 +52,10 @@ import com.rabbitmq.client.AMQP;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.MonoProcessor;
+import reactor.rabbitmq.BindingSpecification;
 import reactor.rabbitmq.ExchangeSpecification;
 import reactor.rabbitmq.OutboundMessage;
+import reactor.rabbitmq.QueueSpecification;
 import reactor.rabbitmq.Sender;
 import reactor.util.function.Tuples;
 
@@ -83,9 +91,23 @@ public class EventDispatcher {
     }
 
     void start() {
-        
sender.declareExchange(ExchangeSpecification.exchange(MAILBOX_EVENT_EXCHANGE_NAME)
-            .durable(DURABLE)
-            .type(DIRECT_EXCHANGE))
+        Flux.concat(
+            
sender.declareExchange(ExchangeSpecification.exchange(MAILBOX_EVENT_EXCHANGE_NAME)
+                .durable(DURABLE)
+                .type(DIRECT_EXCHANGE)),
+            
sender.declareExchange(ExchangeSpecification.exchange(MAILBOX_EVENT_DEAD_LETTER_EXCHANGE_NAME)
+                .durable(DURABLE)
+                .type(DIRECT_EXCHANGE)),
+            
sender.declareQueue(QueueSpecification.queue(MAILBOX_EVENT_DEAD_LETTER_QUEUE)
+                .durable(DURABLE)
+                .exclusive(!EXCLUSIVE)
+                .autoDelete(!AUTO_DELETE)
+                .arguments(NO_ARGUMENTS)),
+            sender.bind(BindingSpecification.binding()
+                .exchange(MAILBOX_EVENT_DEAD_LETTER_EXCHANGE_NAME)
+                .queue(MAILBOX_EVENT_DEAD_LETTER_QUEUE)
+                .routingKey(EMPTY_ROUTING_KEY)))
+            .then()
             .block();
     }
 
diff --git 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
index d95e6bd..5141ae5 100644
--- 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
+++ 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -23,9 +23,10 @@ import static 
org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
 import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
 import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
 import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
-import static org.apache.james.backends.rabbitmq.Constants.NO_ARGUMENTS;
 import static org.apache.james.backends.rabbitmq.Constants.REQUEUE;
+import static org.apache.james.backends.rabbitmq.Constants.deadLetterQueue;
 import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
+import static 
org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_DEAD_LETTER_EXCHANGE_NAME;
 import static 
org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
 
 import java.nio.charset.StandardCharsets;
@@ -124,7 +125,7 @@ class GroupRegistration implements Registration {
                 .durable(DURABLE)
                 .exclusive(!EXCLUSIVE)
                 .autoDelete(!AUTO_DELETE)
-                .arguments(NO_ARGUMENTS)),
+                
.arguments(deadLetterQueue(MAILBOX_EVENT_DEAD_LETTER_EXCHANGE_NAME))),
             sender.bind(BindingSpecification.binding()
                 .exchange(MAILBOX_EVENT_EXCHANGE_NAME)
                 .queue(queueName.asString())
diff --git 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index a73f1ed..29dcc6f 100644
--- 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -40,7 +40,9 @@ public class RabbitMQEventBus implements EventBus, Startable {
     private static final Set<RegistrationKey> NO_KEY = ImmutableSet.of();
     private static final String NOT_RUNNING_ERROR_MESSAGE = "Event Bus is not 
running";
     static final String MAILBOX_EVENT = "mailboxEvent";
+    static final String MAILBOX_EVENT_DEAD_LETTER_QUEUE = MAILBOX_EVENT + 
"-dead-letter-queue";
     static final String MAILBOX_EVENT_EXCHANGE_NAME = MAILBOX_EVENT + 
"-exchange";
+    static final String MAILBOX_EVENT_DEAD_LETTER_EXCHANGE_NAME = 
MAILBOX_EVENT + "-dead-letter-exchange";
     static final String EVENT_BUS_ID = "eventBusId";
 
     private final EventSerializer eventSerializer;
diff --git 
a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
 
b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index 810187c..58d1979 100644
--- 
a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ 
b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -37,6 +37,7 @@ import static 
org.apache.james.mailbox.events.EventBusTestFixture.newAsyncListen
 import static org.apache.james.mailbox.events.EventBusTestFixture.newListener;
 import static 
org.apache.james.mailbox.events.GroupRegistration.WorkQueueName.MAILBOX_EVENT_WORK_QUEUE_PREFIX;
 import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
+import static 
org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_DEAD_LETTER_QUEUE;
 import static 
org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
@@ -53,6 +54,7 @@ import java.io.Closeable;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
@@ -72,6 +74,7 @@ import org.apache.james.mailbox.util.EventCollector;
 import org.apache.james.metrics.tests.RecordingMetricFactory;
 import org.apache.james.util.concurrency.ConcurrentTestRunner;
 import org.assertj.core.data.Percentage;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
@@ -83,6 +86,7 @@ import org.mockito.stubbing.Answer;
 import com.google.common.collect.ImmutableSet;
 
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 import reactor.rabbitmq.BindingSpecification;
 import reactor.rabbitmq.ExchangeSpecification;
 import reactor.rabbitmq.OutboundMessage;
@@ -212,6 +216,31 @@ class RabbitMQEventBusTest implements 
GroupContract.SingleEventBusGroupContract,
     }
 
     @Test
+    void eventProcessingShouldStoreInvalidMessagesInDeadLetterQueue() {
+        EventCollector listener = new EventCollector();
+        EventBusTestFixture.GroupA registeredGroup = new 
EventBusTestFixture.GroupA();
+        eventBus.register(listener, registeredGroup);
+
+        String emptyRoutingKey = "";
+        rabbitMQExtension.getSender()
+            .send(Mono.just(new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME,
+                emptyRoutingKey,
+                "BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8))))
+            .block();
+
+        AtomicInteger deadLetteredCount = new AtomicInteger();
+        rabbitMQExtension.getRabbitChannelPool()
+            .createReceiver()
+            .consumeAutoAck(MAILBOX_EVENT_DEAD_LETTER_QUEUE)
+            .doOnNext(next -> deadLetteredCount.incrementAndGet())
+            .subscribeOn(Schedulers.elastic())
+            .subscribe();
+
+        Awaitility.await().atMost(org.awaitility.Duration.TEN_SECONDS)
+            .untilAsserted(() -> 
assertThat(deadLetteredCount.get()).isEqualTo(1));
+    }
+
+    @Test
     void deserializeEventCollectorGroup() throws Exception {
         
assertThat(Group.deserialize("org.apache.james.mailbox.util.EventCollector$EventCollectorGroup"))
             .isEqualTo(new EventCollector.EventCollectorGroup());
@@ -727,7 +756,8 @@ class RabbitMQEventBusTest implements 
GroupContract.SingleEventBusGroupContract,
                 eventBusWithKeyHandlerNotStarted.stop();
 
                 assertThat(rabbitManagementAPI.listQueues())
-                    .filteredOn(queue -> 
!queue.getName().startsWith(MAILBOX_EVENT_WORK_QUEUE_PREFIX))
+                    .filteredOn(queue -> 
!queue.getName().startsWith(MAILBOX_EVENT_WORK_QUEUE_PREFIX)
+                        && 
!queue.getName().startsWith(MAILBOX_EVENT_DEAD_LETTER_QUEUE))
                     .isEmpty();
             }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to