MAILBOX-367 RabbitMQEvenBus should publish events

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/b1e241c4
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/b1e241c4
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/b1e241c4

Branch: refs/heads/master
Commit: b1e241c4dc8e39174efa12eb400cd2a2f0cc633b
Parents: f1ac836
Author: datph <dphamho...@linagora.com>
Authored: Mon Jan 7 10:33:37 2019 +0700
Committer: Benoit Tellier <btell...@linagora.com>
Committed: Tue Jan 8 14:47:47 2019 +0700

----------------------------------------------------------------------
 .../backend/rabbitmq/RabbitMQExtension.java     |   9 +-
 mailbox/event/event-rabbitmq/pom.xml            | 101 ++++++++++++++++
 .../james/mailbox/events/RabbitMQEventBus.java  |  89 ++++++++++++++
 .../events/RabbitMQEventBusPublishingTest.java  | 121 +++++++++++++++++++
 mailbox/pom.xml                                 |   1 +
 pom.xml                                         |   7 +-
 6 files changed, 325 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/b1e241c4/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java
----------------------------------------------------------------------
diff --git 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java
 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java
index d6d2ead..a64c95f 100644
--- 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java
+++ 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java
@@ -43,6 +43,7 @@ public class RabbitMQExtension implements BeforeAllCallback, 
BeforeEachCallback,
 
     private DockerRabbitMQ rabbitMQ;
     private SimpleChannelPool simpleChannelPool;
+    private RabbitMQConnectionFactory connectionFactory;
 
     @Override
     public void beforeAll(ExtensionContext context) {
@@ -52,7 +53,7 @@ public class RabbitMQExtension implements BeforeAllCallback, 
BeforeEachCallback,
 
     @Override
     public void beforeEach(ExtensionContext extensionContext) throws Exception 
{
-        RabbitMQConnectionFactory connectionFactory = 
createRabbitConnectionFactory();
+        connectionFactory = createRabbitConnectionFactory();
         this.simpleChannelPool = new SimpleChannelPool(connectionFactory);
     }
 
@@ -84,6 +85,10 @@ public class RabbitMQExtension implements BeforeAllCallback, 
BeforeEachCallback,
         return rabbitMQ;
     }
 
+    public RabbitMQConnectionFactory getConnectionFactory() {
+        return connectionFactory;
+    }
+
     private RabbitMQConnectionFactory createRabbitConnectionFactory() throws 
URISyntaxException {
         RabbitMQConfiguration rabbitMQConfiguration = 
RabbitMQConfiguration.builder()
             .amqpUri(rabbitMQ.amqpUri())
@@ -98,4 +103,4 @@ public class RabbitMQExtension implements BeforeAllCallback, 
BeforeEachCallback,
             rabbitMQConfiguration,
             new 
AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor(threadFactory)));
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/b1e241c4/mailbox/event/event-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/pom.xml 
b/mailbox/event/event-rabbitmq/pom.xml
new file mode 100644
index 0000000..c671468
--- /dev/null
+++ b/mailbox/event/event-rabbitmq/pom.xml
@@ -0,0 +1,101 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements. See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership. The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License. You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied. See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>apache-james-mailbox</artifactId>
+        <groupId>org.apache.james</groupId>
+        <version>3.3.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>apache-james-mailbox-event-rabbitmq</artifactId>
+    <name>Apache James :: Mailbox :: Event :: RabbitMQ implementation</name>
+    <description>RabbitMQ implementation for the eventbus API</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>apache-james-backends-rabbitmq</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>apache-james-backends-rabbitmq</artifactId>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>apache-james-mailbox-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>apache-james-mailbox-api</artifactId>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>apache-james-mailbox-event-json</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-testing</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.jayway.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor.rabbitmq</groupId>
+            <artifactId>reactor-rabbitmq</artifactId>
+            <version>1.0.0.RELEASE</version>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.platform</groupId>
+            <artifactId>junit-platform-launcher</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/b1e241c4/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
----------------------------------------------------------------------
diff --git 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
new file mode 100644
index 0000000..4ce792a
--- /dev/null
+++ 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -0,0 +1,89 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Set;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
+import org.apache.james.event.json.EventSerializer;
+import org.apache.james.mailbox.Event;
+import org.apache.james.mailbox.MailboxListener;
+
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import reactor.rabbitmq.ExchangeSpecification;
+import reactor.rabbitmq.OutboundMessage;
+import reactor.rabbitmq.RabbitFlux;
+import reactor.rabbitmq.Sender;
+import reactor.rabbitmq.SenderOptions;
+
+public class RabbitMQEventBus implements EventBus {
+    static final String MAILBOX_EVENT = "mailboxEvent";
+    static final String MAILBOX_EVENT_EXCHANGE_NAME = MAILBOX_EVENT + 
"-exchange";
+    static final String EMPTY_ROUTING_KEY = "";
+
+    private static final boolean DURABLE = true;
+    private static final String DIRECT_EXCHANGE = "direct";
+
+    private final EventSerializer eventSerializer;
+    private final Sender sender;
+
+    RabbitMQEventBus(RabbitMQConnectionFactory rabbitMQConnectionFactory, 
EventSerializer eventSerializer) {
+        this.eventSerializer = eventSerializer;
+        SenderOptions senderOption = new 
SenderOptions().connectionMono(Mono.fromSupplier(rabbitMQConnectionFactory::create));
+        this.sender = RabbitFlux.createSender(senderOption);
+    }
+
+    Mono<Void> start() {
+        return 
sender.declareExchange(ExchangeSpecification.exchange(MAILBOX_EVENT_EXCHANGE_NAME)
+                .durable(DURABLE)
+                .type(DIRECT_EXCHANGE))
+            .subscribeOn(Schedulers.elastic())
+            .then();
+    }
+
+    @Override
+    public Registration register(MailboxListener listener, RegistrationKey 
key) {
+        throw new NotImplementedException("will implement latter");
+    }
+
+    @Override
+    public Registration register(MailboxListener listener, Group group) {
+        throw new NotImplementedException("will implement latter");
+    }
+
+    @Override
+    public Mono<Void> dispatch(Event event, Set<RegistrationKey> key) {
+        Mono<OutboundMessage> outboundMessage = Mono.just(event)
+            .publishOn(Schedulers.parallel())
+            .map(this::serializeEvent)
+            .map(payload -> new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, 
EMPTY_ROUTING_KEY, payload));
+
+        Mono<Void> publishMono = sender.send(outboundMessage).cache();
+        publishMono.subscribe();
+        return publishMono;
+    }
+
+    private byte[] serializeEvent(Event event) {
+        return eventSerializer.toJson(event).getBytes(StandardCharsets.UTF_8);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/b1e241c4/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusPublishingTest.java
----------------------------------------------------------------------
diff --git 
a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusPublishingTest.java
 
b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusPublishingTest.java
new file mode 100644
index 0000000..50a37ca
--- /dev/null
+++ 
b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusPublishingTest.java
@@ -0,0 +1,121 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import static org.apache.james.backend.rabbitmq.Constants.AUTO_DELETE;
+import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backend.rabbitmq.Constants.EXCLUSIVE;
+import static org.apache.james.backend.rabbitmq.Constants.NO_ARGUMENTS;
+import static org.apache.james.mailbox.events.EventBusContract.EVENT;
+import static org.apache.james.mailbox.events.EventBusContract.NO_KEYS;
+import static 
org.apache.james.mailbox.events.RabbitMQEventBus.EMPTY_ROUTING_KEY;
+import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT;
+import static 
org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.charset.StandardCharsets;
+
+import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
+import org.apache.james.backend.rabbitmq.RabbitMQExtension;
+import org.apache.james.event.json.EventSerializer;
+import org.apache.james.mailbox.Event;
+import org.apache.james.mailbox.model.TestId;
+import org.apache.james.mailbox.model.TestMessageId;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.google.common.collect.ImmutableSet;
+
+import reactor.core.publisher.Mono;
+import reactor.rabbitmq.BindingSpecification;
+import reactor.rabbitmq.QueueSpecification;
+import reactor.rabbitmq.RabbitFlux;
+import reactor.rabbitmq.Receiver;
+import reactor.rabbitmq.ReceiverOptions;
+import reactor.rabbitmq.Sender;
+import reactor.rabbitmq.SenderOptions;
+
+class RabbitMQEventBusPublishingTest {
+    private static final String MAILBOX_WORK_QUEUE_NAME = MAILBOX_EVENT + 
"-workQueue";
+
+    @RegisterExtension
+    static RabbitMQExtension rabbitMQExtension = new RabbitMQExtension();
+
+    private RabbitMQEventBus eventBus;
+    private EventSerializer eventSerializer;
+    private RabbitMQConnectionFactory connectionFactory;
+
+    @BeforeEach
+    void setUp() {
+        connectionFactory = rabbitMQExtension.getConnectionFactory();
+
+        eventSerializer = new EventSerializer(new TestId.Factory(), new 
TestMessageId.Factory());
+        eventBus = new RabbitMQEventBus(connectionFactory, eventSerializer);
+        eventBus.start().block();
+
+        createQueue();
+    }
+
+    private void createQueue() {
+        SenderOptions senderOption = new SenderOptions()
+            .connectionMono(Mono.fromSupplier(connectionFactory::create));
+        Sender sender = RabbitFlux.createSender(senderOption);
+
+        sender.declareQueue(QueueSpecification.queue(MAILBOX_WORK_QUEUE_NAME)
+            .durable(DURABLE)
+            .exclusive(!EXCLUSIVE)
+            .autoDelete(!AUTO_DELETE)
+            .arguments(NO_ARGUMENTS))
+            .block();
+        sender.bind(BindingSpecification.binding()
+            .exchange(MAILBOX_EVENT_EXCHANGE_NAME)
+            .queue(MAILBOX_WORK_QUEUE_NAME)
+            .routingKey(EMPTY_ROUTING_KEY))
+            .block();
+    }
+
+    @Test
+    void dispatchShouldPublishSerializedEventToRabbitMQ() {
+        eventBus.dispatch(EVENT, NO_KEYS).block();
+
+        assertThat(dequeueEvent()).isEqualTo(EVENT);
+    }
+
+    @Test
+    void dispatchShouldPublishSerializedEventToRabbitMQWhenNotBlocking() {
+        eventBus.dispatch(EVENT, NO_KEYS);
+
+        assertThat(dequeueEvent()).isEqualTo(EVENT);
+    }
+
+
+    private Event dequeueEvent() {
+        RabbitMQConnectionFactory connectionFactory = 
rabbitMQExtension.getConnectionFactory();
+        Receiver receiver = RabbitFlux.createReceiver(new 
ReceiverOptions().connectionMono(Mono.just(connectionFactory.create())));
+
+        byte[] eventInBytes = receiver.consumeAutoAck(MAILBOX_WORK_QUEUE_NAME)
+            .blockFirst()
+            .getBody();
+
+        return eventSerializer.fromJson(new String(eventInBytes, 
StandardCharsets.UTF_8))
+            .get();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/b1e241c4/mailbox/pom.xml
----------------------------------------------------------------------
diff --git a/mailbox/pom.xml b/mailbox/pom.xml
index f7f7869..e363b7e 100644
--- a/mailbox/pom.xml
+++ b/mailbox/pom.xml
@@ -42,6 +42,7 @@
         <module>elasticsearch</module>
 
         <module>event/event-memory</module>
+        <module>event/event-rabbitmq</module>
         <module>event/json</module>
 
         <module>jpa</module>

http://git-wip-us.apache.org/repos/asf/james-project/blob/b1e241c4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1cc4a2d..56486ae 100644
--- a/pom.xml
+++ b/pom.xml
@@ -762,6 +762,11 @@
             </dependency>
             <dependency>
                 <groupId>${james.groupId}</groupId>
+                <artifactId>apache-james-mailbox-event-json</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>${james.groupId}</groupId>
                 <artifactId>apache-james-mailbox-event-memory</artifactId>
                 <version>${project.version}</version>
             </dependency>
@@ -3376,4 +3381,4 @@
             -->
         </plugins>
     </reporting>
-</project>
+</project>
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to