MAILBOX-367 RabbitMQEvenBus should publish events
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/b1e241c4 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/b1e241c4 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/b1e241c4 Branch: refs/heads/master Commit: b1e241c4dc8e39174efa12eb400cd2a2f0cc633b Parents: f1ac836 Author: datph <dphamho...@linagora.com> Authored: Mon Jan 7 10:33:37 2019 +0700 Committer: Benoit Tellier <btell...@linagora.com> Committed: Tue Jan 8 14:47:47 2019 +0700 ---------------------------------------------------------------------- .../backend/rabbitmq/RabbitMQExtension.java | 9 +- mailbox/event/event-rabbitmq/pom.xml | 101 ++++++++++++++++ .../james/mailbox/events/RabbitMQEventBus.java | 89 ++++++++++++++ .../events/RabbitMQEventBusPublishingTest.java | 121 +++++++++++++++++++ mailbox/pom.xml | 1 + pom.xml | 7 +- 6 files changed, 325 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/b1e241c4/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java index d6d2ead..a64c95f 100644 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java @@ -43,6 +43,7 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback, private DockerRabbitMQ rabbitMQ; private SimpleChannelPool simpleChannelPool; + private RabbitMQConnectionFactory connectionFactory; @Override public void beforeAll(ExtensionContext context) { @@ -52,7 +53,7 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback, @Override public void beforeEach(ExtensionContext extensionContext) throws Exception { - RabbitMQConnectionFactory connectionFactory = createRabbitConnectionFactory(); + connectionFactory = createRabbitConnectionFactory(); this.simpleChannelPool = new SimpleChannelPool(connectionFactory); } @@ -84,6 +85,10 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback, return rabbitMQ; } + public RabbitMQConnectionFactory getConnectionFactory() { + return connectionFactory; + } + private RabbitMQConnectionFactory createRabbitConnectionFactory() throws URISyntaxException { RabbitMQConfiguration rabbitMQConfiguration = RabbitMQConfiguration.builder() .amqpUri(rabbitMQ.amqpUri()) @@ -98,4 +103,4 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback, rabbitMQConfiguration, new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor(threadFactory))); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/b1e241c4/mailbox/event/event-rabbitmq/pom.xml ---------------------------------------------------------------------- diff --git a/mailbox/event/event-rabbitmq/pom.xml b/mailbox/event/event-rabbitmq/pom.xml new file mode 100644 index 0000000..c671468 --- /dev/null +++ b/mailbox/event/event-rabbitmq/pom.xml @@ -0,0 +1,101 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>apache-james-mailbox</artifactId> + <groupId>org.apache.james</groupId> + <version>3.3.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <artifactId>apache-james-mailbox-event-rabbitmq</artifactId> + <name>Apache James :: Mailbox :: Event :: RabbitMQ implementation</name> + <description>RabbitMQ implementation for the eventbus API</description> + + <dependencies> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>apache-james-backends-rabbitmq</artifactId> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>apache-james-backends-rabbitmq</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>apache-james-mailbox-api</artifactId> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>apache-james-mailbox-api</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>apache-james-mailbox-event-json</artifactId> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>james-server-testing</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.jayway.awaitility</groupId> + <artifactId>awaitility</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-core</artifactId> + </dependency> + <dependency> + <groupId>io.projectreactor.rabbitmq</groupId> + <artifactId>reactor-rabbitmq</artifactId> + <version>1.0.0.RELEASE</version> + </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-engine</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.platform</groupId> + <artifactId>junit-platform-launcher</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/b1e241c4/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java ---------------------------------------------------------------------- diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java new file mode 100644 index 0000000..4ce792a --- /dev/null +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java @@ -0,0 +1,89 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events; + +import java.nio.charset.StandardCharsets; +import java.util.Set; + +import org.apache.commons.lang3.NotImplementedException; +import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory; +import org.apache.james.event.json.EventSerializer; +import org.apache.james.mailbox.Event; +import org.apache.james.mailbox.MailboxListener; + +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; +import reactor.rabbitmq.ExchangeSpecification; +import reactor.rabbitmq.OutboundMessage; +import reactor.rabbitmq.RabbitFlux; +import reactor.rabbitmq.Sender; +import reactor.rabbitmq.SenderOptions; + +public class RabbitMQEventBus implements EventBus { + static final String MAILBOX_EVENT = "mailboxEvent"; + static final String MAILBOX_EVENT_EXCHANGE_NAME = MAILBOX_EVENT + "-exchange"; + static final String EMPTY_ROUTING_KEY = ""; + + private static final boolean DURABLE = true; + private static final String DIRECT_EXCHANGE = "direct"; + + private final EventSerializer eventSerializer; + private final Sender sender; + + RabbitMQEventBus(RabbitMQConnectionFactory rabbitMQConnectionFactory, EventSerializer eventSerializer) { + this.eventSerializer = eventSerializer; + SenderOptions senderOption = new SenderOptions().connectionMono(Mono.fromSupplier(rabbitMQConnectionFactory::create)); + this.sender = RabbitFlux.createSender(senderOption); + } + + Mono<Void> start() { + return sender.declareExchange(ExchangeSpecification.exchange(MAILBOX_EVENT_EXCHANGE_NAME) + .durable(DURABLE) + .type(DIRECT_EXCHANGE)) + .subscribeOn(Schedulers.elastic()) + .then(); + } + + @Override + public Registration register(MailboxListener listener, RegistrationKey key) { + throw new NotImplementedException("will implement latter"); + } + + @Override + public Registration register(MailboxListener listener, Group group) { + throw new NotImplementedException("will implement latter"); + } + + @Override + public Mono<Void> dispatch(Event event, Set<RegistrationKey> key) { + Mono<OutboundMessage> outboundMessage = Mono.just(event) + .publishOn(Schedulers.parallel()) + .map(this::serializeEvent) + .map(payload -> new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, EMPTY_ROUTING_KEY, payload)); + + Mono<Void> publishMono = sender.send(outboundMessage).cache(); + publishMono.subscribe(); + return publishMono; + } + + private byte[] serializeEvent(Event event) { + return eventSerializer.toJson(event).getBytes(StandardCharsets.UTF_8); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/b1e241c4/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusPublishingTest.java ---------------------------------------------------------------------- diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusPublishingTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusPublishingTest.java new file mode 100644 index 0000000..50a37ca --- /dev/null +++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusPublishingTest.java @@ -0,0 +1,121 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events; + +import static org.apache.james.backend.rabbitmq.Constants.AUTO_DELETE; +import static org.apache.james.backend.rabbitmq.Constants.DURABLE; +import static org.apache.james.backend.rabbitmq.Constants.EXCLUSIVE; +import static org.apache.james.backend.rabbitmq.Constants.NO_ARGUMENTS; +import static org.apache.james.mailbox.events.EventBusContract.EVENT; +import static org.apache.james.mailbox.events.EventBusContract.NO_KEYS; +import static org.apache.james.mailbox.events.RabbitMQEventBus.EMPTY_ROUTING_KEY; +import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT; +import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME; +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.charset.StandardCharsets; + +import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory; +import org.apache.james.backend.rabbitmq.RabbitMQExtension; +import org.apache.james.event.json.EventSerializer; +import org.apache.james.mailbox.Event; +import org.apache.james.mailbox.model.TestId; +import org.apache.james.mailbox.model.TestMessageId; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.google.common.collect.ImmutableSet; + +import reactor.core.publisher.Mono; +import reactor.rabbitmq.BindingSpecification; +import reactor.rabbitmq.QueueSpecification; +import reactor.rabbitmq.RabbitFlux; +import reactor.rabbitmq.Receiver; +import reactor.rabbitmq.ReceiverOptions; +import reactor.rabbitmq.Sender; +import reactor.rabbitmq.SenderOptions; + +class RabbitMQEventBusPublishingTest { + private static final String MAILBOX_WORK_QUEUE_NAME = MAILBOX_EVENT + "-workQueue"; + + @RegisterExtension + static RabbitMQExtension rabbitMQExtension = new RabbitMQExtension(); + + private RabbitMQEventBus eventBus; + private EventSerializer eventSerializer; + private RabbitMQConnectionFactory connectionFactory; + + @BeforeEach + void setUp() { + connectionFactory = rabbitMQExtension.getConnectionFactory(); + + eventSerializer = new EventSerializer(new TestId.Factory(), new TestMessageId.Factory()); + eventBus = new RabbitMQEventBus(connectionFactory, eventSerializer); + eventBus.start().block(); + + createQueue(); + } + + private void createQueue() { + SenderOptions senderOption = new SenderOptions() + .connectionMono(Mono.fromSupplier(connectionFactory::create)); + Sender sender = RabbitFlux.createSender(senderOption); + + sender.declareQueue(QueueSpecification.queue(MAILBOX_WORK_QUEUE_NAME) + .durable(DURABLE) + .exclusive(!EXCLUSIVE) + .autoDelete(!AUTO_DELETE) + .arguments(NO_ARGUMENTS)) + .block(); + sender.bind(BindingSpecification.binding() + .exchange(MAILBOX_EVENT_EXCHANGE_NAME) + .queue(MAILBOX_WORK_QUEUE_NAME) + .routingKey(EMPTY_ROUTING_KEY)) + .block(); + } + + @Test + void dispatchShouldPublishSerializedEventToRabbitMQ() { + eventBus.dispatch(EVENT, NO_KEYS).block(); + + assertThat(dequeueEvent()).isEqualTo(EVENT); + } + + @Test + void dispatchShouldPublishSerializedEventToRabbitMQWhenNotBlocking() { + eventBus.dispatch(EVENT, NO_KEYS); + + assertThat(dequeueEvent()).isEqualTo(EVENT); + } + + + private Event dequeueEvent() { + RabbitMQConnectionFactory connectionFactory = rabbitMQExtension.getConnectionFactory(); + Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(Mono.just(connectionFactory.create()))); + + byte[] eventInBytes = receiver.consumeAutoAck(MAILBOX_WORK_QUEUE_NAME) + .blockFirst() + .getBody(); + + return eventSerializer.fromJson(new String(eventInBytes, StandardCharsets.UTF_8)) + .get(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/b1e241c4/mailbox/pom.xml ---------------------------------------------------------------------- diff --git a/mailbox/pom.xml b/mailbox/pom.xml index f7f7869..e363b7e 100644 --- a/mailbox/pom.xml +++ b/mailbox/pom.xml @@ -42,6 +42,7 @@ <module>elasticsearch</module> <module>event/event-memory</module> + <module>event/event-rabbitmq</module> <module>event/json</module> <module>jpa</module> http://git-wip-us.apache.org/repos/asf/james-project/blob/b1e241c4/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 1cc4a2d..56486ae 100644 --- a/pom.xml +++ b/pom.xml @@ -762,6 +762,11 @@ </dependency> <dependency> <groupId>${james.groupId}</groupId> + <artifactId>apache-james-mailbox-event-json</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> <artifactId>apache-james-mailbox-event-memory</artifactId> <version>${project.version}</version> </dependency> @@ -3376,4 +3381,4 @@ --> </plugins> </reporting> -</project> +</project> \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org