MAILBOX-211 Remove previously contributed distributed mailbox listeners - Registration was performed on mutable mailbox path - Tests were mostly written with MOCK making reuse harder - Proposed API implied managing registrations by itself, while some messaging software can handle this (routing on top of RabbitMQ) - We also want to propose a WorkQueue for Global Listeners
This old API made changes harder and clashes with some intended refactorings. Hence, removing it seems to be the right move. Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/46c2a279 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/46c2a279 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/46c2a279 Branch: refs/heads/master Commit: 46c2a279d2fa054de0f19dc58d3d5af27ef3f12e Parents: ef7c00f Author: Benoit Tellier <btell...@linagora.com> Authored: Mon Nov 19 09:24:27 2018 +0700 Committer: Benoit Tellier <btell...@linagora.com> Committed: Mon Nov 19 11:07:59 2018 +0700 ---------------------------------------------------------------------- .../CassandraMailboxPathRegisterMapper.java | 94 ----- ...istributedMailboxDelegatingListenerTest.java | 167 --------- ...CassandraMailboxPathRegistrerMapperTest.java | 113 ------ .../resources/META-INF/spring/event-system.xml | 30 -- .../BroadcastDelegatingMailboxListener.java | 148 -------- .../distributed/DistantMailboxPathRegister.java | 197 ----------- .../DistantMailboxPathRegisterMapper.java | 35 -- .../DistributedDelegatingMailboxListener.java | 27 -- .../event/distributed/MailboxPathRegister.java | 58 ---- .../RegisteredDelegatingMailboxListener.java | 184 ---------- ...elegatingMailboxListenerIntegrationTest.java | 149 -------- .../BroadcastDelegatingMailboxListenerTest.java | 206 ----------- .../DistantMailboxPathRegisterTest.java | 345 ------------------- .../event/distributed/PublisherReceiver.java | 72 ---- ...RegisteredDelegatingMailboxListenerTest.java | 210 ----------- 15 files changed, 2035 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/46c2a279/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegisterMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegisterMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegisterMapper.java deleted file mode 100644 index 30fa3cb..0000000 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegisterMapper.java +++ /dev/null @@ -1,94 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.mailbox.cassandra.event.distributed; - -import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; -import static com.datastax.driver.core.querybuilder.QueryBuilder.delete; -import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; -import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; -import static com.datastax.driver.core.querybuilder.QueryBuilder.select; -import static com.datastax.driver.core.querybuilder.QueryBuilder.ttl; - -import java.util.Set; -import java.util.stream.Collectors; - -import org.apache.james.backends.cassandra.init.CassandraTypesProvider; -import org.apache.james.backends.cassandra.utils.CassandraUtils; -import org.apache.james.mailbox.cassandra.table.CassandraMailboxPathRegisterTable; -import org.apache.james.mailbox.model.MailboxPath; -import org.apache.james.mailbox.store.event.distributed.DistantMailboxPathRegisterMapper; -import org.apache.james.mailbox.store.publisher.Topic; - -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.Session; -import com.datastax.driver.core.UDTValue; - -public class CassandraMailboxPathRegisterMapper implements DistantMailboxPathRegisterMapper { - - private final Session session; - private final CassandraTypesProvider typesProvider; - private final int cassandraTimeOutInS; - private final CassandraUtils cassandraUtils; - private final PreparedStatement insertStatement; - private final PreparedStatement deleteStatement; - private final PreparedStatement selectStatement; - - public CassandraMailboxPathRegisterMapper(Session session, CassandraTypesProvider typesProvider, CassandraUtils cassandraUtils, int cassandraTimeOutInS) { - this.session = session; - this.typesProvider = typesProvider; - this.cassandraTimeOutInS = cassandraTimeOutInS; - this.insertStatement = session.prepare(insertInto(CassandraMailboxPathRegisterTable.TABLE_NAME) - .value(CassandraMailboxPathRegisterTable.MAILBOX_PATH, bindMarker()) - .value(CassandraMailboxPathRegisterTable.TOPIC, bindMarker()) - .using(ttl(bindMarker()))); - this.deleteStatement = session.prepare(delete().from(CassandraMailboxPathRegisterTable.TABLE_NAME) - .where(eq(CassandraMailboxPathRegisterTable.MAILBOX_PATH, bindMarker())) - .and(eq(CassandraMailboxPathRegisterTable.TOPIC, bindMarker()))); - this.selectStatement = session.prepare(select().from(CassandraMailboxPathRegisterTable.TABLE_NAME) - .where(eq(CassandraMailboxPathRegisterTable.MAILBOX_PATH, bindMarker()))); - this.cassandraUtils = cassandraUtils; - } - - @Override - public Set<Topic> getTopics(MailboxPath mailboxPath) { - return cassandraUtils.convertToStream(session.execute(selectStatement.bind(buildUDTFromMailboxPath(mailboxPath)))) - .map(row -> new Topic(row.getString(CassandraMailboxPathRegisterTable.TOPIC))) - .collect(Collectors.toSet()); - } - - @Override - public void doRegister(MailboxPath mailboxPath, Topic topic) { - session.execute(insertStatement.bind(buildUDTFromMailboxPath(mailboxPath), topic.getValue(), cassandraTimeOutInS)); - } - - @Override - public void doUnRegister(MailboxPath mailboxPath, Topic topic) { - session.execute(deleteStatement.bind(buildUDTFromMailboxPath(mailboxPath), topic.getValue())); - } - - private UDTValue buildUDTFromMailboxPath(MailboxPath path) { - return typesProvider.getDefinedUserType(CassandraMailboxPathRegisterTable.MAILBOX_PATH) - .newValue() - .setString(CassandraMailboxPathRegisterTable.MailboxPath.NAMESPACE, path.getNamespace()) - .setString(CassandraMailboxPathRegisterTable.MailboxPath.USER, path.getUser()) - .setString(CassandraMailboxPathRegisterTable.MailboxPath.NAME, path.getName()); - } - -} http://git-wip-us.apache.org/repos/asf/james-project/blob/46c2a279/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraBasedRegisteredDistributedMailboxDelegatingListenerTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraBasedRegisteredDistributedMailboxDelegatingListenerTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraBasedRegisteredDistributedMailboxDelegatingListenerTest.java deleted file mode 100644 index e5150b7..0000000 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraBasedRegisteredDistributedMailboxDelegatingListenerTest.java +++ /dev/null @@ -1,167 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.mailbox.cassandra.event.distributed; - -import static org.assertj.core.api.Assertions.assertThat; - -import java.util.TreeMap; - -import org.apache.james.backends.cassandra.CassandraCluster; -import org.apache.james.backends.cassandra.CassandraClusterExtension; -import org.apache.james.backends.cassandra.utils.CassandraUtils; -import org.apache.james.mailbox.MailboxListener; -import org.apache.james.mailbox.MailboxSession; -import org.apache.james.mailbox.MessageUid; -import org.apache.james.mailbox.cassandra.modules.CassandraRegistrationModule; -import org.apache.james.mailbox.mock.MockMailboxSession; -import org.apache.james.mailbox.model.MailboxPath; -import org.apache.james.mailbox.model.MessageMetaData; -import org.apache.james.mailbox.model.TestId; -import org.apache.james.mailbox.model.TestMessageId; -import org.apache.james.mailbox.store.TestIdDeserializer; -import org.apache.james.mailbox.store.event.EventFactory; -import org.apache.james.mailbox.store.event.distributed.DistantMailboxPathRegister; -import org.apache.james.mailbox.store.event.distributed.PublisherReceiver; -import org.apache.james.mailbox.store.event.distributed.RegisteredDelegatingMailboxListener; -import org.apache.james.mailbox.store.json.JsonEventSerializer; -import org.apache.james.mailbox.store.json.event.EventConverter; -import org.apache.james.mailbox.store.json.event.MailboxConverter; -import org.apache.james.mailbox.store.mail.model.MailboxMessage; -import org.apache.james.mailbox.store.mail.model.impl.SimpleMailbox; -import org.apache.james.mailbox.util.EventCollector; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; - -import com.google.common.collect.ImmutableMap; - -/** - Integration tests for RegisteredDelegatingMailboxListener using a cassandra back-end. - - We simulate communications using message queues in memory and check the Listener works as intended. - */ -class CassandraBasedRegisteredDistributedMailboxDelegatingListenerTest { - - public static final MailboxPath MAILBOX_PATH_1 = MailboxPath.forUser("user", "mbx"); - public static final MailboxPath MAILBOX_PATH_2 = MailboxPath.forUser("user", "mbx.other"); - public static final int CASSANDRA_TIME_OUT_IN_S = 10; - public static final int SCHEDULER_PERIOD_IN_S = 20; - public static final ImmutableMap<MessageUid, MailboxMessage> EMPTY_MESSAGE_CACHE = ImmutableMap.of(); - - @RegisterExtension - static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraRegistrationModule.MODULE); - - private RegisteredDelegatingMailboxListener registeredDelegatingMailboxListener1; - private RegisteredDelegatingMailboxListener registeredDelegatingMailboxListener2; - private RegisteredDelegatingMailboxListener registeredDelegatingMailboxListener3; - private EventCollector eventCollectorMailbox1; - private EventCollector eventCollectorMailbox2; - private EventCollector eventCollectorMailbox3; - private EventCollector eventCollectorOnce1; - private EventCollector eventCollectorOnce2; - private EventCollector eventCollectorOnce3; - private MailboxSession mailboxSession; - - @BeforeEach - void setUp(CassandraCluster cassandra) throws Exception { - PublisherReceiver publisherReceiver = new PublisherReceiver(); - DistantMailboxPathRegister mailboxPathRegister1 = new DistantMailboxPathRegister( - new CassandraMailboxPathRegisterMapper( - cassandra.getConf(), - cassandra.getTypesProvider(), - CassandraUtils.WITH_DEFAULT_CONFIGURATION, - CASSANDRA_TIME_OUT_IN_S), - SCHEDULER_PERIOD_IN_S); - JsonEventSerializer eventSerializer = new JsonEventSerializer( - new EventConverter(new MailboxConverter(new TestIdDeserializer())), - new TestMessageId.Factory()); - registeredDelegatingMailboxListener1 = new RegisteredDelegatingMailboxListener( - eventSerializer, - publisherReceiver, - publisherReceiver, - mailboxPathRegister1); - DistantMailboxPathRegister mailboxPathRegister2 = new DistantMailboxPathRegister( - new CassandraMailboxPathRegisterMapper( - cassandra.getConf(), - cassandra.getTypesProvider(), - CassandraUtils.WITH_DEFAULT_CONFIGURATION, - CASSANDRA_TIME_OUT_IN_S), - SCHEDULER_PERIOD_IN_S); - registeredDelegatingMailboxListener2 = new RegisteredDelegatingMailboxListener( - eventSerializer, - publisherReceiver, - publisherReceiver, - mailboxPathRegister2); - DistantMailboxPathRegister mailboxPathRegister3 = new DistantMailboxPathRegister( - new CassandraMailboxPathRegisterMapper( - cassandra.getConf(), - cassandra.getTypesProvider(), - CassandraUtils.WITH_DEFAULT_CONFIGURATION, - CASSANDRA_TIME_OUT_IN_S), - SCHEDULER_PERIOD_IN_S); - registeredDelegatingMailboxListener3 = new RegisteredDelegatingMailboxListener( - eventSerializer, - publisherReceiver, - publisherReceiver, - mailboxPathRegister3); - eventCollectorMailbox1 = new EventCollector(MailboxListener.ListenerType.MAILBOX); - eventCollectorMailbox2 = new EventCollector(MailboxListener.ListenerType.MAILBOX); - eventCollectorMailbox3 = new EventCollector(MailboxListener.ListenerType.MAILBOX); - eventCollectorOnce1 = new EventCollector(MailboxListener.ListenerType.ONCE); - eventCollectorOnce2 = new EventCollector(MailboxListener.ListenerType.ONCE); - eventCollectorOnce3 = new EventCollector(MailboxListener.ListenerType.ONCE); - mailboxSession = new MockMailboxSession("Test"); - registeredDelegatingMailboxListener1.addGlobalListener(eventCollectorOnce1, mailboxSession); - registeredDelegatingMailboxListener2.addGlobalListener(eventCollectorOnce2, mailboxSession); - registeredDelegatingMailboxListener3.addGlobalListener(eventCollectorOnce3, mailboxSession); - registeredDelegatingMailboxListener1.addListener(MAILBOX_PATH_1, eventCollectorMailbox1, mailboxSession); - registeredDelegatingMailboxListener2.addListener(MAILBOX_PATH_1, eventCollectorMailbox2, mailboxSession); - registeredDelegatingMailboxListener3.addListener(MAILBOX_PATH_2, eventCollectorMailbox3, mailboxSession); - } - - @Test - void mailboxEventListenersShouldBeTriggeredIfRegistered() { - SimpleMailbox simpleMailbox = new SimpleMailbox(MAILBOX_PATH_1, 42); - simpleMailbox.setMailboxId(TestId.of(52)); - TreeMap<MessageUid, MessageMetaData> uids = new TreeMap<>(); - final MailboxListener.MailboxEvent event = new EventFactory().added(mailboxSession, uids, simpleMailbox, EMPTY_MESSAGE_CACHE); - - registeredDelegatingMailboxListener1.event(event); - - assertThat(eventCollectorMailbox1.getEvents()).hasSize(1); - assertThat(eventCollectorMailbox2.getEvents()).hasSize(1); - assertThat(eventCollectorMailbox3.getEvents()).isEmpty(); - } - - @Test - void onceEventListenersShouldBeTriggeredOnceAcrossTheCluster() { - SimpleMailbox simpleMailbox = new SimpleMailbox(MAILBOX_PATH_1, 42); - simpleMailbox.setMailboxId(TestId.of(52)); - TreeMap<MessageUid, MessageMetaData> uids = new TreeMap<>(); - final MailboxListener.MailboxEvent event = new EventFactory().added(mailboxSession, uids, simpleMailbox, EMPTY_MESSAGE_CACHE); - - registeredDelegatingMailboxListener1.event(event); - - assertThat(eventCollectorOnce1.getEvents()).hasSize(1); - assertThat(eventCollectorOnce2.getEvents()).isEmpty(); - assertThat(eventCollectorOnce3.getEvents()).isEmpty(); - } - -} http://git-wip-us.apache.org/repos/asf/james-project/blob/46c2a279/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegistrerMapperTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegistrerMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegistrerMapperTest.java deleted file mode 100644 index 213006a..0000000 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegistrerMapperTest.java +++ /dev/null @@ -1,113 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.mailbox.cassandra.event.distributed; - -import static org.assertj.core.api.Assertions.assertThat; - -import java.util.concurrent.TimeUnit; - -import org.apache.james.backends.cassandra.CassandraCluster; -import org.apache.james.backends.cassandra.CassandraClusterExtension; -import org.apache.james.backends.cassandra.utils.CassandraUtils; -import org.apache.james.mailbox.cassandra.modules.CassandraRegistrationModule; -import org.apache.james.mailbox.model.MailboxPath; -import org.apache.james.mailbox.store.publisher.Topic; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; - -class CassandraMailboxPathRegistrerMapperTest { - @RegisterExtension - static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraRegistrationModule.MODULE); - - private static final MailboxPath MAILBOX_PATH = new MailboxPath("namespace", "user", "name"); - private static final MailboxPath MAILBOX_PATH_2 = new MailboxPath("namespace2", "user2", "name2"); - private static final Topic TOPIC = new Topic("topic"); - private static final int CASSANDRA_TIME_OUT_IN_S = 100; - private static final Topic TOPIC_2 = new Topic("topic2"); - - private CassandraMailboxPathRegisterMapper mapper; - - @BeforeEach - void setUp(CassandraCluster cassandra) { - mapper = new CassandraMailboxPathRegisterMapper(cassandra.getConf(), - cassandra.getTypesProvider(), - CassandraUtils.WITH_DEFAULT_CONFIGURATION, - CASSANDRA_TIME_OUT_IN_S); - } - - @Test - void getTopicsShouldReturnEmptyResultByDefault() { - assertThat(mapper.getTopics(MAILBOX_PATH)).isEmpty(); - } - - @Test - void doRegisterShouldWork() { - mapper.doRegister(MAILBOX_PATH, TOPIC); - assertThat(mapper.getTopics(MAILBOX_PATH)).containsOnly(TOPIC); - } - - @Test - void doRegisterShouldBeMailboxPathSpecific() { - mapper.doRegister(MAILBOX_PATH, TOPIC); - assertThat(mapper.getTopics(MAILBOX_PATH_2)).isEmpty(); - } - - @Test - void doRegisterShouldAllowMultipleTopics() { - mapper.doRegister(MAILBOX_PATH, TOPIC); - mapper.doRegister(MAILBOX_PATH, TOPIC_2); - assertThat(mapper.getTopics(MAILBOX_PATH)).containsOnly(TOPIC, TOPIC_2); - } - - @Test - void doUnRegisterShouldWork() { - mapper.doRegister(MAILBOX_PATH, TOPIC); - mapper.doUnRegister(MAILBOX_PATH, TOPIC); - assertThat(mapper.getTopics(MAILBOX_PATH)).isEmpty(); - } - - @Test - void doUnregisterShouldBeMailboxSpecific() { - mapper.doRegister(MAILBOX_PATH, TOPIC); - mapper.doUnRegister(MAILBOX_PATH_2, TOPIC); - assertThat(mapper.getTopics(MAILBOX_PATH)).containsOnly(TOPIC); - } - - @Test - void doUnregisterShouldBeTopicSpecific() { - mapper.doRegister(MAILBOX_PATH, TOPIC); - mapper.doUnRegister(MAILBOX_PATH, TOPIC_2); - assertThat(mapper.getTopics(MAILBOX_PATH)).containsOnly(TOPIC); - } - - @Test - void entriesShouldExpire(CassandraCluster cassandra) throws Exception { - int verySmallTimeoutInSecond = 1; - mapper = new CassandraMailboxPathRegisterMapper(cassandra.getConf(), - cassandra.getTypesProvider(), - CassandraUtils.WITH_DEFAULT_CONFIGURATION, - verySmallTimeoutInSecond); - mapper.doRegister(MAILBOX_PATH, TOPIC); - Thread.sleep(2 * TimeUnit.SECONDS.toMillis(verySmallTimeoutInSecond)); - assertThat(mapper.getTopics(MAILBOX_PATH)).isEmpty(); - } - -} http://git-wip-us.apache.org/repos/asf/james-project/blob/46c2a279/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml ---------------------------------------------------------------------- diff --git a/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml b/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml index b168c72..44eace7 100644 --- a/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml +++ b/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml @@ -28,40 +28,10 @@ <constructor-arg index="1" ref="event-registry"/> </bean> - <bean id="broadcast-delegating-listener" class="org.apache.james.mailbox.store.event.distributed.BroadcastDelegatingMailboxListener" lazy-init="true"> - <constructor-arg index="0" ref="publisher"/> - <constructor-arg index="1" ref="consumer"/> - <constructor-arg index="2" ref="event-serializer"/> - <constructor-arg index="3" ref="event-delivery"/> - <constructor-arg index="4" ref="${global.topic}"/> - </bean> - - <bean id="registered-delegating-listener" class="org.apache.james.mailbox.store.event.distributed.RegisteredDelegatingMailboxListener" lazy-init="true"> - <constructor-arg index="0" ref="publisher"/> - <constructor-arg index="1" ref="consumer"/> - <constructor-arg index="2" ref="event-serializer"/> - <constructor-arg index="3" ref="mailbox-path-register"/> - <constructor-arg index="4" ref="event-delivery"/> - </bean> - - <bean id="mailbox-path-register" class="org.apache.james.mailbox.store.event.distributed.DistantMailboxPathRegister" lazy-init="true"> - <constructor-arg index="0" ref="distant-mailbox-path-register-mapper"/> - <constructor-arg index="1" ref="${distant.mailbox.path.register.max.retries}"/> - <constructor-arg index="2" ref="${distant.mailbox.path.register.refresh}"/> - </bean> - - <bean id="json-event-serializer" class="org.apache.james.mailbox.store.json.JsonEventSerializer" lazy-init="true"> - <constructor-arg index="0" ref="mailbox-converter"/> - </bean> - <bean id="event-converter" class="org.apache.james.mailbox.store.json.event.EventConverter" lazy-init="true"> <constructor-arg index="0" ref="mailbox-converter"/> </bean> - <bean id="mailbox-converter" class="org.apache.james.mailbox.store.json.event.MailboxConverter" lazy-init="true"> - <constructor-arg index="0" ref="mailbox-id-deserializer"/> - </bean> - <bean id="synchronous-event-delivery" class="org.apache.james.mailbox.store.event.SynchronousEventDelivery" lazy-init="true"> <constructor-arg index="0" ref="metricFactory"/> </bean> http://git-wip-us.apache.org/repos/asf/james-project/blob/46c2a279/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/BroadcastDelegatingMailboxListener.java ---------------------------------------------------------------------- diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/BroadcastDelegatingMailboxListener.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/BroadcastDelegatingMailboxListener.java deleted file mode 100644 index 2171e80..0000000 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/BroadcastDelegatingMailboxListener.java +++ /dev/null @@ -1,148 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.mailbox.store.event.distributed; - -import java.util.Collection; - -import org.apache.james.mailbox.Event; -import org.apache.james.mailbox.MailboxListener; -import org.apache.james.mailbox.MailboxSession; -import org.apache.james.mailbox.exception.MailboxException; -import org.apache.james.mailbox.model.MailboxPath; -import org.apache.james.mailbox.store.event.EventDelivery; -import org.apache.james.mailbox.store.event.EventSerializer; -import org.apache.james.mailbox.store.event.MailboxListenerRegistry; -import org.apache.james.mailbox.store.event.SynchronousEventDelivery; -import org.apache.james.mailbox.store.publisher.MessageConsumer; -import org.apache.james.mailbox.store.publisher.Publisher; -import org.apache.james.mailbox.store.publisher.Topic; -import org.apache.james.metrics.api.NoopMetricFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.annotations.VisibleForTesting; - -public class BroadcastDelegatingMailboxListener implements DistributedDelegatingMailboxListener { - - private static final Logger LOGGER = LoggerFactory.getLogger(BroadcastDelegatingMailboxListener.class); - - private final MailboxListenerRegistry mailboxListenerRegistry; - private final Publisher publisher; - private final EventSerializer eventSerializer; - private final Topic globalTopic; - private final EventDelivery eventDelivery; - - public BroadcastDelegatingMailboxListener(Publisher publisher, - MessageConsumer messageConsumer, - EventSerializer eventSerializer, - EventDelivery eventDelivery, - String globalTopic) throws Exception { - this.mailboxListenerRegistry = new MailboxListenerRegistry(); - this.publisher = publisher; - this.eventSerializer = eventSerializer; - this.globalTopic = new Topic(globalTopic); - this.eventDelivery = eventDelivery; - messageConsumer.setMessageReceiver(this); - messageConsumer.init(this.globalTopic); - } - - @VisibleForTesting - public BroadcastDelegatingMailboxListener(Publisher publisher, - MessageConsumer messageConsumer, - EventSerializer eventSerializer, - String globalTopic) throws Exception { - this(publisher, messageConsumer, eventSerializer, new SynchronousEventDelivery(new NoopMetricFactory()), globalTopic); - } - - @Override - public ListenerType getType() { - return ListenerType.ONCE; - } - - @Override - public void addListener(MailboxPath mailboxPath, MailboxListener listener, MailboxSession session) throws MailboxException { - mailboxListenerRegistry.addListener(mailboxPath, listener); - } - - @Override - public void removeListener(MailboxPath mailboxPath, MailboxListener listener, MailboxSession session) throws MailboxException { - mailboxListenerRegistry.removeListener(mailboxPath, listener); - } - - @Override - public void addGlobalListener(MailboxListener listener, MailboxSession session) throws MailboxException { - mailboxListenerRegistry.addGlobalListener(listener); - } - - @Override - public void removeGlobalListener(MailboxListener listener, MailboxSession session) throws MailboxException { - mailboxListenerRegistry.removeGlobalListener(listener); - } - - @Override - public void event(Event event) { - deliverEventToGlobalListeners(event, ListenerType.ONCE); - if (event instanceof MailboxEvent) { - MailboxEvent mailboxEvent = (MailboxEvent) event; - publishMailboxEvent(mailboxEvent); - } - } - - private void publishMailboxEvent(MailboxEvent event) { - try { - publisher.publish(globalTopic, eventSerializer.serializeEvent(event)); - } catch (Throwable t) { - LOGGER.error("Error while sending event to publisher", t); - } - } - - @Override - public void receiveSerializedEvent(byte[] serializedEvent) { - try { - MailboxEvent event = eventSerializer.deSerializeEvent(serializedEvent); - deliverToMailboxPathRegisteredListeners(event); - deliverEventToGlobalListeners(event, ListenerType.EACH_NODE); - } catch (Exception e) { - LOGGER.error("Error while receiving serialized event", e); - } - } - - private void deliverToMailboxPathRegisteredListeners(MailboxEvent event) { - Collection<MailboxListener> listenerSnapshot = mailboxListenerRegistry.getLocalMailboxListeners(event.getMailboxPath()); - if (event instanceof MailboxDeletion) { - mailboxListenerRegistry.deleteRegistryFor(event.getMailboxPath()); - } else if (event instanceof MailboxRenamed) { - MailboxRenamed renamed = (MailboxRenamed) event; - mailboxListenerRegistry.handleRename(renamed.getMailboxPath(), renamed.getNewPath()); - } - for (MailboxListener listener : listenerSnapshot) { - eventDelivery.deliver(listener, event); - } - } - - private void deliverEventToGlobalListeners(Event event, ListenerType type) { - for (MailboxListener mailboxListener : mailboxListenerRegistry.getGlobalListeners()) { - if (mailboxListener.getType() == type) { - eventDelivery.deliver(mailboxListener, event); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/james-project/blob/46c2a279/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegister.java ---------------------------------------------------------------------- diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegister.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegister.java deleted file mode 100644 index c92cb87..0000000 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegister.java +++ /dev/null @@ -1,197 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.mailbox.store.event.distributed; - -import java.util.Map; -import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; - -import org.apache.james.mailbox.exception.MailboxException; -import org.apache.james.mailbox.model.MailboxPath; -import org.apache.james.mailbox.store.publisher.Topic; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableSet; - -public class DistantMailboxPathRegister implements MailboxPathRegister { - private static final int DEFAULT_MAX_RETRY = 1000; - private final ConcurrentHashMap<MailboxPath, Long> registeredMailboxPathCount; - private final DistantMailboxPathRegisterMapper mapper; - private final Topic topic; - private final Timer timer; - private final int maxRetry; - private final long schedulerPeriodInS; - - public DistantMailboxPathRegister(DistantMailboxPathRegisterMapper mapper, long schedulerPeriodInS) { - this(mapper, DEFAULT_MAX_RETRY, schedulerPeriodInS); - } - - public DistantMailboxPathRegister(DistantMailboxPathRegisterMapper mapper, int maxRetry, long schedulerPeriodInS) { - this.maxRetry = maxRetry; - this.mapper = mapper; - this.registeredMailboxPathCount = new ConcurrentHashMap<>(); - this.topic = new Topic(UUID.randomUUID().toString()); - this.timer = new Timer(); - this.schedulerPeriodInS = schedulerPeriodInS; - } - - @PostConstruct - public void init() { - timer.schedule(new TimerTask() { - @Override - public void run() { - Set<Map.Entry<MailboxPath, Long>> snapshot = ImmutableSet.copyOf(registeredMailboxPathCount.entrySet()); - for (Map.Entry<MailboxPath, Long> entry : snapshot) { - if (entry.getValue() > 0) { - mapper.doRegister(entry.getKey(), topic); - } - } - } - }, 0L, schedulerPeriodInS * 1000); - } - - @PreDestroy - public void destroy() { - timer.cancel(); - timer.purge(); - } - - @Override - public Set<Topic> getTopics(MailboxPath mailboxPath) { - return mapper.getTopics(mailboxPath); - } - - @Override - public Topic getLocalTopic() { - return topic; - } - - @Override - public void register(MailboxPath path) throws MailboxException { - int count = 0; - boolean success = false; - while (count < maxRetry && !success) { - count++; - success = tryRegister(path); - } - if (!success) { - throw new MailboxException(maxRetry + " reached while trying to register " + path); - } - } - - @Override - public void unregister(MailboxPath path) throws MailboxException { - int count = 0; - boolean success = false; - while (count < maxRetry && !success) { - count++; - success = tryUnregister(path); - } - if (!success) { - throw new MailboxException(maxRetry + " reached while trying to unregister " + path); - } - } - - @Override - public void doCompleteUnRegister(MailboxPath mailboxPath) { - registeredMailboxPathCount.remove(mailboxPath); - mapper.doUnRegister(mailboxPath, topic); - } - - @Override - public void doRename(MailboxPath oldPath, MailboxPath newPath) throws MailboxException { - try { - int count = 0; - boolean success = false; - while (count < maxRetry && !success) { - success = tryCountTransfer(oldPath, newPath); - } - if (!success) { - throw new MailboxException(maxRetry + " reached while trying to rename " + oldPath + " in " + newPath); - } - } finally { - doCompleteUnRegister(oldPath); - } - } - - private boolean tryCountTransfer(MailboxPath oldPath, MailboxPath newPath) throws MailboxException { - Long oldEntry = registeredMailboxPathCount.get(oldPath); - if (oldEntry == null) { - throw new MailboxException("Renamed entry does not exists"); - } - Long entry = registeredMailboxPathCount.get(newPath); - if (entry != null) { - return registeredMailboxPathCount.replace(newPath, entry, oldEntry + entry); - } else { - if (registeredMailboxPathCount.putIfAbsent(newPath, oldEntry) == null) { - mapper.doRegister(newPath, topic); - return true; - } - return false; - } - } - - private boolean tryRegister(MailboxPath path) { - Long entry = registeredMailboxPathCount.get(path); - Long newEntry = entry; - if (newEntry == null) { - newEntry = 0L; - } - newEntry++; - if (entry != null) { - return registeredMailboxPathCount.replace(path, entry, newEntry); - } else { - if (registeredMailboxPathCount.putIfAbsent(path, newEntry) == null) { - mapper.doRegister(path, topic); - return true; - } - return false; - } - } - - private boolean tryUnregister(MailboxPath path) throws MailboxException { - Long entry = registeredMailboxPathCount.get(path); - Long newEntry = entry; - if (newEntry == null) { - throw new MailboxException("Removing a non registered mailboxPath"); - } - newEntry--; - if (newEntry != 0) { - return registeredMailboxPathCount.replace(path, entry, newEntry); - } else { - if (registeredMailboxPathCount.remove(path, entry)) { - mapper.doUnRegister(path, topic); - return true; - } - return false; - } - } - - @VisibleForTesting - ConcurrentHashMap<MailboxPath, Long> getRegisteredMailboxPathCount() { - return registeredMailboxPathCount; - } -} http://git-wip-us.apache.org/repos/asf/james-project/blob/46c2a279/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegisterMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegisterMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegisterMapper.java deleted file mode 100644 index c672b50..0000000 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegisterMapper.java +++ /dev/null @@ -1,35 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.mailbox.store.event.distributed; - -import java.util.Set; - -import org.apache.james.mailbox.model.MailboxPath; -import org.apache.james.mailbox.store.publisher.Topic; - -public interface DistantMailboxPathRegisterMapper { - - Set<Topic> getTopics(MailboxPath mailboxPath); - - void doRegister(MailboxPath mailboxPath, Topic topic); - - void doUnRegister(MailboxPath mailboxPath, Topic topic); - -} http://git-wip-us.apache.org/repos/asf/james-project/blob/46c2a279/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistributedDelegatingMailboxListener.java ---------------------------------------------------------------------- diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistributedDelegatingMailboxListener.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistributedDelegatingMailboxListener.java deleted file mode 100644 index d78688f..0000000 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistributedDelegatingMailboxListener.java +++ /dev/null @@ -1,27 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.mailbox.store.event.distributed; - -import org.apache.james.mailbox.store.event.DelegatingMailboxListener; -import org.apache.james.mailbox.store.publisher.MessageReceiver; - -public interface DistributedDelegatingMailboxListener extends DelegatingMailboxListener, MessageReceiver { - -} http://git-wip-us.apache.org/repos/asf/james-project/blob/46c2a279/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/MailboxPathRegister.java ---------------------------------------------------------------------- diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/MailboxPathRegister.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/MailboxPathRegister.java deleted file mode 100644 index 49a7627..0000000 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/MailboxPathRegister.java +++ /dev/null @@ -1,58 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.mailbox.store.event.distributed; - -import java.util.Set; - -import org.apache.james.mailbox.exception.MailboxException; -import org.apache.james.mailbox.model.MailboxPath; -import org.apache.james.mailbox.store.publisher.Topic; - -/** - * The TopicDispatcher allow you to : - * - * - know to which queues you will need to send an event - * - get the topic this James instance will be pooling - */ -public interface MailboxPathRegister { - - /** - * Given a MailboxPath, we want to get the different topics we need to send the event to. - * - * @param mailboxPath MailboxPath - * @return List of topics concerned by this MailboxPath - */ - Set<Topic> getTopics(MailboxPath mailboxPath); - - /** - * Get the topic this James instance will consume - * - * @return The topic this James instance will consume - */ - Topic getLocalTopic(); - - void register(MailboxPath path) throws MailboxException; - - void unregister(MailboxPath path) throws MailboxException; - - void doCompleteUnRegister(MailboxPath mailboxPath); - - void doRename(MailboxPath oldPath, MailboxPath newPath) throws MailboxException; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/46c2a279/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/RegisteredDelegatingMailboxListener.java ---------------------------------------------------------------------- diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/RegisteredDelegatingMailboxListener.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/RegisteredDelegatingMailboxListener.java deleted file mode 100644 index 8165536..0000000 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/RegisteredDelegatingMailboxListener.java +++ /dev/null @@ -1,184 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.mailbox.store.event.distributed; - -import java.util.Collection; -import java.util.Set; - -import org.apache.james.mailbox.Event; -import org.apache.james.mailbox.MailboxListener; -import org.apache.james.mailbox.MailboxSession; -import org.apache.james.mailbox.exception.MailboxException; -import org.apache.james.mailbox.model.MailboxPath; -import org.apache.james.mailbox.store.event.EventDelivery; -import org.apache.james.mailbox.store.event.EventSerializer; -import org.apache.james.mailbox.store.event.MailboxListenerRegistry; -import org.apache.james.mailbox.store.event.SynchronousEventDelivery; -import org.apache.james.mailbox.store.publisher.MessageConsumer; -import org.apache.james.mailbox.store.publisher.Publisher; -import org.apache.james.mailbox.store.publisher.Topic; -import org.apache.james.metrics.api.NoopMetricFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.annotations.VisibleForTesting; - -public class RegisteredDelegatingMailboxListener implements DistributedDelegatingMailboxListener { - - private static final Logger LOGGER = LoggerFactory.getLogger(RegisteredDelegatingMailboxListener.class); - - private final MailboxListenerRegistry mailboxListenerRegistry; - private final MailboxPathRegister mailboxPathRegister; - private final Publisher publisher; - private final EventSerializer eventSerializer; - private final EventDelivery eventDelivery; - - public RegisteredDelegatingMailboxListener(EventSerializer eventSerializer, - Publisher publisher, - MessageConsumer messageConsumer, - MailboxPathRegister mailboxPathRegister, - EventDelivery eventDelivery) throws Exception { - this.eventSerializer = eventSerializer; - this.publisher = publisher; - this.mailboxPathRegister = mailboxPathRegister; - this.mailboxListenerRegistry = new MailboxListenerRegistry(); - this.eventDelivery = eventDelivery; - messageConsumer.setMessageReceiver(this); - messageConsumer.init(mailboxPathRegister.getLocalTopic()); - } - - @VisibleForTesting - public RegisteredDelegatingMailboxListener(EventSerializer eventSerializer, - Publisher publisher, - MessageConsumer messageConsumer, - MailboxPathRegister mailboxPathRegister) throws Exception { - this(eventSerializer, publisher, messageConsumer, mailboxPathRegister, new SynchronousEventDelivery(new NoopMetricFactory())); - } - - @Override - public ListenerType getType() { - return ListenerType.ONCE; - } - - @Override - public void addListener(MailboxPath path, MailboxListener listener, MailboxSession session) throws MailboxException { - mailboxListenerRegistry.addListener(path, listener); - mailboxPathRegister.register(path); - } - - @Override - public void addGlobalListener(MailboxListener listener, MailboxSession session) throws MailboxException { - if (listener.getType().equals(ListenerType.EACH_NODE)) { - throw new MailboxException("Attempt to register a global listener that need to be called on each node while using a non compatible delegating listeners"); - } - mailboxListenerRegistry.addGlobalListener(listener); - } - - @Override - public void removeListener(MailboxPath mailboxPath, MailboxListener listener, MailboxSession session) throws MailboxException { - mailboxListenerRegistry.removeListener(mailboxPath, listener); - mailboxPathRegister.unregister(mailboxPath); - } - - @Override - public void removeGlobalListener(MailboxListener listener, MailboxSession session) throws MailboxException { - mailboxListenerRegistry.removeGlobalListener(listener); - } - - @Override - public void event(Event event) { - try { - deliverEventToOnceGlobalListeners(event); - deliverToMailboxPathRegisteredListeners(event); - if (event instanceof MailboxEvent) { - MailboxEvent mailboxEvent = (MailboxEvent) event; - sendToRemoteJames(mailboxEvent); - } - } catch (Throwable t) { - LOGGER.error("Error while delegating event {}", event.getClass().getCanonicalName(), t); - } - } - - @Override - public void receiveSerializedEvent(byte[] serializedEvent) { - try { - MailboxEvent event = eventSerializer.deSerializeEvent(serializedEvent); - deliverToMailboxPathRegisteredListeners(event); - } catch (Exception e) { - LOGGER.error("Error while receiving serialized event", e); - } - } - - private void deliverToMailboxPathRegisteredListeners(Event event) throws MailboxException { - if (event instanceof MailboxEvent) { - MailboxEvent mailboxEvent = (MailboxEvent) event; - deliverToMailboxPathRegisteredListeners(mailboxEvent); - } - } - - private void deliverToMailboxPathRegisteredListeners(MailboxEvent mailboxEvent) throws MailboxException { - Collection<MailboxListener> listenerSnapshot = mailboxListenerRegistry.getLocalMailboxListeners(mailboxEvent.getMailboxPath()); - if (mailboxEvent instanceof MailboxDeletion && listenerSnapshot.size() > 0) { - mailboxListenerRegistry.deleteRegistryFor(mailboxEvent.getMailboxPath()); - mailboxPathRegister.doCompleteUnRegister(mailboxEvent.getMailboxPath()); - } else if (mailboxEvent instanceof MailboxRenamed && listenerSnapshot.size() > 0) { - MailboxRenamed renamed = (MailboxRenamed) mailboxEvent; - mailboxListenerRegistry.handleRename(renamed.getMailboxPath(), renamed.getNewPath()); - mailboxPathRegister.doRename(renamed.getMailboxPath(), renamed.getNewPath()); - } - for (MailboxListener listener : listenerSnapshot) { - eventDelivery.deliver(listener, mailboxEvent); - } - } - - private void deliverEventToOnceGlobalListeners(Event event) { - for (MailboxListener mailboxListener : mailboxListenerRegistry.getGlobalListeners()) { - if (mailboxListener.getType() == ListenerType.ONCE) { - eventDelivery.deliver(mailboxListener, event); - } - } - } - - private void sendToRemoteJames(MailboxEvent event) { - Set<Topic> topics = mailboxPathRegister.getTopics(event.getMailboxPath()); - topics.remove(mailboxPathRegister.getLocalTopic()); - if (topics.size() > 0) { - sendEventToRemotesJamesByTopic(event, topics); - } - } - - private void sendEventToRemotesJamesByTopic(MailboxEvent event, Set<Topic> topics) { - byte[] serializedEvent; - try { - serializedEvent = eventSerializer.serializeEvent(event); - } catch (Exception e) { - LOGGER.error("Unable to serialize {}", event.getClass().getCanonicalName(), e); - return; - } - for (Topic topic : topics) { - try { - publisher.publish(topic, serializedEvent); - } catch (Throwable t) { - LOGGER.error("Unable to send serialized event to topic {}", topic); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/james-project/blob/46c2a279/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/distributed/BroadcastDelegatingMailboxListenerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/distributed/BroadcastDelegatingMailboxListenerIntegrationTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/distributed/BroadcastDelegatingMailboxListenerIntegrationTest.java deleted file mode 100644 index 9403bfc..0000000 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/distributed/BroadcastDelegatingMailboxListenerIntegrationTest.java +++ /dev/null @@ -1,149 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.mailbox.store.event.distributed; - -import static org.assertj.core.api.Assertions.assertThat; - -import java.util.TreeMap; - -import org.apache.james.mailbox.MailboxListener; -import org.apache.james.mailbox.MailboxSession; -import org.apache.james.mailbox.MessageUid; -import org.apache.james.mailbox.mock.MockMailboxSession; -import org.apache.james.mailbox.model.MailboxPath; -import org.apache.james.mailbox.model.TestId; -import org.apache.james.mailbox.model.TestMessageId; -import org.apache.james.mailbox.store.TestIdDeserializer; -import org.apache.james.mailbox.store.event.EventFactory; -import org.apache.james.mailbox.store.json.JsonEventSerializer; -import org.apache.james.mailbox.store.json.event.EventConverter; -import org.apache.james.mailbox.store.json.event.MailboxConverter; -import org.apache.james.mailbox.store.mail.model.MailboxMessage; -import org.apache.james.mailbox.store.mail.model.impl.SimpleMailbox; -import org.apache.james.mailbox.util.EventCollector; -import org.junit.Before; -import org.junit.Test; - -import com.google.common.collect.ImmutableMap; - -/** - Integration tests for BroadcastDelegatingMailboxListener. - - We simulate communications using message queues in memory and check the Listener works as intended. - */ -public class BroadcastDelegatingMailboxListenerIntegrationTest { - - public static final MailboxPath MAILBOX_PATH_1 = MailboxPath.forUser("user", "mbx"); - public static final MailboxPath MAILBOX_PATH_2 = MailboxPath.forUser("user", "mbx.other"); - public static final String TOPIC = "TOPIC"; - public static final ImmutableMap<MessageUid, MailboxMessage> EMPTY_MESSAGE_CACHE = ImmutableMap.<MessageUid, MailboxMessage>of(); - private BroadcastDelegatingMailboxListener broadcastDelegatingMailboxListener1; - private BroadcastDelegatingMailboxListener broadcastDelegatingMailboxListener2; - private BroadcastDelegatingMailboxListener broadcastDelegatingMailboxListener3; - private EventCollector eventCollectorMailbox1; - private EventCollector eventCollectorMailbox2; - private EventCollector eventCollectorMailbox3; - private EventCollector eventCollectorOnce1; - private EventCollector eventCollectorOnce2; - private EventCollector eventCollectorOnce3; - private EventCollector eventCollectorEach1; - private EventCollector eventCollectorEach2; - private EventCollector eventCollectorEach3; - private MailboxSession mailboxSession; - - @Before - public void setUp() throws Exception { - PublisherReceiver publisherReceiver = new PublisherReceiver(); - JsonEventSerializer eventSerializer = new JsonEventSerializer( - new EventConverter(new MailboxConverter(new TestIdDeserializer())), - new TestMessageId.Factory()); - broadcastDelegatingMailboxListener1 = new BroadcastDelegatingMailboxListener(publisherReceiver, - publisherReceiver, - eventSerializer, - TOPIC); - broadcastDelegatingMailboxListener2 = new BroadcastDelegatingMailboxListener(publisherReceiver, - publisherReceiver, - eventSerializer, - TOPIC); - broadcastDelegatingMailboxListener3 = new BroadcastDelegatingMailboxListener(publisherReceiver, - publisherReceiver, - eventSerializer, - TOPIC); - eventCollectorMailbox1 = new EventCollector(MailboxListener.ListenerType.MAILBOX); - eventCollectorMailbox2 = new EventCollector(MailboxListener.ListenerType.MAILBOX); - eventCollectorMailbox3 = new EventCollector(MailboxListener.ListenerType.MAILBOX); - eventCollectorOnce1 = new EventCollector(MailboxListener.ListenerType.ONCE); - eventCollectorOnce2 = new EventCollector(MailboxListener.ListenerType.ONCE); - eventCollectorOnce3 = new EventCollector(MailboxListener.ListenerType.ONCE); - eventCollectorEach1 = new EventCollector(MailboxListener.ListenerType.EACH_NODE); - eventCollectorEach2 = new EventCollector(MailboxListener.ListenerType.EACH_NODE); - eventCollectorEach3 = new EventCollector(MailboxListener.ListenerType.EACH_NODE); - mailboxSession = new MockMailboxSession("Test"); - broadcastDelegatingMailboxListener1.addGlobalListener(eventCollectorOnce1, mailboxSession); - broadcastDelegatingMailboxListener2.addGlobalListener(eventCollectorOnce2, mailboxSession); - broadcastDelegatingMailboxListener3.addGlobalListener(eventCollectorOnce3, mailboxSession); - broadcastDelegatingMailboxListener1.addGlobalListener(eventCollectorEach1, mailboxSession); - broadcastDelegatingMailboxListener2.addGlobalListener(eventCollectorEach2, mailboxSession); - broadcastDelegatingMailboxListener3.addGlobalListener(eventCollectorEach3, mailboxSession); - broadcastDelegatingMailboxListener1.addListener(MAILBOX_PATH_1, eventCollectorMailbox1, mailboxSession); - broadcastDelegatingMailboxListener2.addListener(MAILBOX_PATH_1, eventCollectorMailbox2, mailboxSession); - broadcastDelegatingMailboxListener3.addListener(MAILBOX_PATH_2, eventCollectorMailbox3, mailboxSession); - } - - @Test - public void mailboxEventListenersShouldBeTriggeredIfRegistered() throws Exception { - SimpleMailbox simpleMailbox = new SimpleMailbox(MAILBOX_PATH_1, 42); - simpleMailbox.setMailboxId(TestId.of(52)); - final MailboxListener.MailboxEvent event = new EventFactory().added(mailboxSession, new TreeMap<>(), simpleMailbox, EMPTY_MESSAGE_CACHE); - - broadcastDelegatingMailboxListener1.event(event); - - assertThat(eventCollectorMailbox1.getEvents()).hasSize(1); - assertThat(eventCollectorMailbox2.getEvents()).hasSize(1); - assertThat(eventCollectorMailbox3.getEvents()).isEmpty(); - } - - @Test - public void onceEventListenersShouldBeTriggeredOnceAcrossTheCluster() { - SimpleMailbox simpleMailbox = new SimpleMailbox(MAILBOX_PATH_1, 42); - simpleMailbox.setMailboxId(TestId.of(52)); - final MailboxListener.MailboxEvent event = new EventFactory().added(mailboxSession, new TreeMap<>(), simpleMailbox, EMPTY_MESSAGE_CACHE); - - broadcastDelegatingMailboxListener1.event(event); - - assertThat(eventCollectorOnce1.getEvents()).hasSize(1); - assertThat(eventCollectorOnce2.getEvents()).isEmpty(); - assertThat(eventCollectorOnce3.getEvents()).isEmpty(); - } - - @Test - public void eachEventListenersShouldBeTriggeredOnEachNode() { - SimpleMailbox simpleMailbox = new SimpleMailbox(MAILBOX_PATH_1, 42); - simpleMailbox.setMailboxId(TestId.of(52)); - final MailboxListener.MailboxEvent event = new EventFactory().added(mailboxSession, new TreeMap<>(), simpleMailbox, EMPTY_MESSAGE_CACHE); - - broadcastDelegatingMailboxListener1.event(event); - - assertThat(eventCollectorEach1.getEvents()).hasSize(1); - assertThat(eventCollectorEach2.getEvents()).hasSize(1); - assertThat(eventCollectorEach3.getEvents()).hasSize(1); - } - -} http://git-wip-us.apache.org/repos/asf/james-project/blob/46c2a279/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/distributed/BroadcastDelegatingMailboxListenerTest.java ---------------------------------------------------------------------- diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/distributed/BroadcastDelegatingMailboxListenerTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/distributed/BroadcastDelegatingMailboxListenerTest.java deleted file mode 100644 index a0d6ac8..0000000 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/distributed/BroadcastDelegatingMailboxListenerTest.java +++ /dev/null @@ -1,206 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.mailbox.store.event.distributed; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - -import java.util.Optional; - -import org.apache.james.core.quota.QuotaCount; -import org.apache.james.core.quota.QuotaSize; -import org.apache.james.mailbox.MailboxListener; -import org.apache.james.mailbox.MailboxSession; -import org.apache.james.mailbox.mock.MockMailboxSession; -import org.apache.james.mailbox.model.MailboxPath; -import org.apache.james.mailbox.model.QuotaRoot; -import org.apache.james.mailbox.store.event.EventSerializer; -import org.apache.james.mailbox.store.publisher.MessageConsumer; -import org.apache.james.mailbox.store.publisher.Publisher; -import org.apache.james.mailbox.store.publisher.Topic; -import org.apache.james.mailbox.util.EventCollector; -import org.junit.Before; -import org.junit.Test; - -public class BroadcastDelegatingMailboxListenerTest { - - private static final MailboxPath MAILBOX_PATH = new MailboxPath("namespace", "user", "name"); - private static final MailboxPath MAILBOX_PATH_NEW = new MailboxPath("namespace_new", "user_new", "name_new"); - private static final Topic TOPIC = new Topic("topic"); - private static final byte[] BYTES = new byte[0]; - - private BroadcastDelegatingMailboxListener broadcastDelegatingMailboxListener; - private Publisher mockedPublisher; - private EventSerializer mockedEventSerializer; - private EventCollector mailboxEventCollector; - private EventCollector eachEventCollector; - private EventCollector onceEventCollector; - private MailboxSession mailboxSession; - private MailboxListener.MailboxEvent event; - - @Before - public void setUp() throws Exception { - mailboxSession = new MockMailboxSession("benwa"); - event = new MailboxListener.MailboxEvent(mailboxSession, MAILBOX_PATH) {}; - - mockedEventSerializer = mock(EventSerializer.class); - mockedPublisher = mock(Publisher.class); - MessageConsumer messageConsumer = mock(MessageConsumer.class); - broadcastDelegatingMailboxListener = new BroadcastDelegatingMailboxListener(mockedPublisher, messageConsumer, mockedEventSerializer, TOPIC.getValue()); - mailboxEventCollector = new EventCollector(MailboxListener.ListenerType.MAILBOX); - eachEventCollector = new EventCollector(MailboxListener.ListenerType.EACH_NODE); - onceEventCollector = new EventCollector(MailboxListener.ListenerType.ONCE); - } - - @Test - public void eventWithNoRegisteredListenersShouldWork() throws Exception { - when(mockedEventSerializer.serializeEvent(event)) - .thenReturn(BYTES); - - broadcastDelegatingMailboxListener.event(event); - - verify(mockedEventSerializer).serializeEvent(event); - verify(mockedPublisher).publish(TOPIC, BYTES); - verifyNoMoreInteractions(mockedEventSerializer, mockedPublisher); - } - - @Test - public void eventWithMailboxRegisteredListenerShouldWork() throws Exception { - broadcastDelegatingMailboxListener.addListener(MAILBOX_PATH, mailboxEventCollector, mailboxSession); - when(mockedEventSerializer.serializeEvent(event)).thenReturn(BYTES); - - broadcastDelegatingMailboxListener.event(event); - - assertThat(mailboxEventCollector.getEvents()).isEmpty(); - verify(mockedEventSerializer).serializeEvent(event); - verify(mockedPublisher).publish(TOPIC, BYTES); - verifyNoMoreInteractions(mockedEventSerializer, mockedPublisher); - } - - @Test - public void eventWithEachRegisteredListenerShouldWork() throws Exception { - broadcastDelegatingMailboxListener.addGlobalListener(eachEventCollector, mailboxSession); - when(mockedEventSerializer.serializeEvent(event)).thenReturn(BYTES); - - broadcastDelegatingMailboxListener.event(event); - assertThat(eachEventCollector.getEvents()).isEmpty(); - - verify(mockedEventSerializer).serializeEvent(event); - verify(mockedPublisher).publish(TOPIC, BYTES); - verifyNoMoreInteractions(mockedEventSerializer, mockedPublisher); - } - - @Test - public void eventWithOnceRegisteredListenerShouldWork() throws Exception { - broadcastDelegatingMailboxListener.addGlobalListener(onceEventCollector, mailboxSession); - when(mockedEventSerializer.serializeEvent(event)).thenReturn(BYTES); - - broadcastDelegatingMailboxListener.event(event); - - assertThat(onceEventCollector.getEvents()).containsOnly(event); - verify(mockedEventSerializer).serializeEvent(event); - verify(mockedPublisher).publish(TOPIC, BYTES); - verifyNoMoreInteractions(mockedEventSerializer, mockedPublisher); - } - - @Test - public void receiveSerializedEventShouldWorkWithNoRegisteredListeners() throws Exception { - when(mockedEventSerializer.deSerializeEvent(BYTES)).thenReturn(event); - - broadcastDelegatingMailboxListener.receiveSerializedEvent(BYTES); - - verify(mockedEventSerializer).deSerializeEvent(BYTES); - verifyNoMoreInteractions(mockedEventSerializer, mockedPublisher); - } - - @Test - public void receiveSerializedEventShouldWorkWithMailboxRegisteredListeners() throws Exception { - broadcastDelegatingMailboxListener.addListener(MAILBOX_PATH, mailboxEventCollector, mailboxSession); - when(mockedEventSerializer.deSerializeEvent(BYTES)).thenReturn(event); - - broadcastDelegatingMailboxListener.receiveSerializedEvent(BYTES); - - verify(mockedEventSerializer).deSerializeEvent(BYTES); - verifyNoMoreInteractions(mockedEventSerializer, mockedPublisher); - assertThat(mailboxEventCollector.getEvents()).containsOnly(event); - } - - @Test - public void receiveSerializedEventShouldWorkWithEachRegisteredListeners() throws Exception { - broadcastDelegatingMailboxListener.addGlobalListener(eachEventCollector, mailboxSession); - when(mockedEventSerializer.deSerializeEvent(BYTES)).thenReturn(event); - - broadcastDelegatingMailboxListener.receiveSerializedEvent(BYTES); - - verify(mockedEventSerializer).deSerializeEvent(BYTES); - verifyNoMoreInteractions(mockedEventSerializer, mockedPublisher); - assertThat(eachEventCollector.getEvents()).containsOnly(event); - } - - @Test - public void receiveSerializedEventShouldWorkWithOnceRegisteredListeners() throws Exception { - broadcastDelegatingMailboxListener.addGlobalListener(onceEventCollector, mailboxSession); - when(mockedEventSerializer.deSerializeEvent(BYTES)).thenReturn(event); - - broadcastDelegatingMailboxListener.receiveSerializedEvent(BYTES); - - verify(mockedEventSerializer).deSerializeEvent(BYTES); - verifyNoMoreInteractions(mockedEventSerializer, mockedPublisher); - assertThat(onceEventCollector.getEvents()).isEmpty(); - } - - @Test - public void deletionDistantEventsShouldBeWellHandled() throws Exception { - QuotaRoot quotaRoot = QuotaRoot.quotaRoot("root", Optional.empty()); - QuotaCount quotaCount = QuotaCount.count(123); - QuotaSize quotaSize = QuotaSize.size(456); - MailboxListener.MailboxEvent event = new MailboxListener.MailboxDeletion(mailboxSession, MAILBOX_PATH, quotaRoot, quotaCount, quotaSize); - broadcastDelegatingMailboxListener.addListener(MAILBOX_PATH, mailboxEventCollector, mailboxSession); - when(mockedEventSerializer.deSerializeEvent(BYTES)).thenReturn(event); - - broadcastDelegatingMailboxListener.receiveSerializedEvent(BYTES); - - verify(mockedEventSerializer).deSerializeEvent(BYTES); - verifyNoMoreInteractions(mockedEventSerializer, mockedPublisher); - assertThat(mailboxEventCollector.getEvents()).containsOnly(event); - } - - @Test - public void renameDistantEventsShouldBeWellHandled() throws Exception { - final MailboxListener.MailboxEvent event = new MailboxListener.MailboxRenamed(mailboxSession, MAILBOX_PATH) { - @Override - public MailboxPath getNewPath() { - return MAILBOX_PATH_NEW; - } - }; - when(mockedEventSerializer.deSerializeEvent(BYTES)).thenReturn(event); - - broadcastDelegatingMailboxListener.addListener(MAILBOX_PATH, mailboxEventCollector, mailboxSession); - broadcastDelegatingMailboxListener.receiveSerializedEvent(BYTES); - - verify(mockedEventSerializer).deSerializeEvent(BYTES); - verifyNoMoreInteractions(mockedEventSerializer, mockedPublisher); - assertThat(mailboxEventCollector.getEvents()).containsOnly(event); - } - -} --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org