Author: btellier
Date: Sat Nov 28 13:04:34 2015
New Revision: 1716960
URL: http://svn.apache.org/viewvc?rev=1716960&view=rev
Log:
MAILBOX-211 Add a registration system based on MailboxPaths and a Cassandra
based implementation
Added:
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/event/
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/event/distributed/
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegisterMapper.java
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraRegistrationModule.java
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMailboxPathRegisterTable.java
james/project/trunk/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/event/
james/project/trunk/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/event/distributed/
james/project/trunk/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegistrerMapperTest.java
james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/
james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegister.java
james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegisterMapper.java
james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/MailboxPathRegister.java
james/project/trunk/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/distributed/
james/project/trunk/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegisterTest.java
Modified:
james/project/trunk/backends-common/cassandra/src/main/resources/META-INF/cassandra-session.xml
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
Modified:
james/project/trunk/backends-common/cassandra/src/main/resources/META-INF/cassandra-session.xml
URL:
http://svn.apache.org/viewvc/james/project/trunk/backends-common/cassandra/src/main/resources/META-INF/cassandra-session.xml?rev=1716960&r1=1716959&r2=1716960&view=diff
==============================================================================
---
james/project/trunk/backends-common/cassandra/src/main/resources/META-INF/cassandra-session.xml
(original)
+++
james/project/trunk/backends-common/cassandra/src/main/resources/META-INF/cassandra-session.xml
Sat Nov 28 13:04:34 2015
@@ -67,6 +67,10 @@
class="org.apache.james.mailbox.cassandra.modules.CassandraQuotaModule"
lazy-init="true"/>
+ <bean id="cassandra-mailbox-registration-module"
+
class="org.apache.james.mailbox.cassandra.modules.CassandraRegistrationModule"
+ lazy-init="true"/>
+
<bean id="cassandra-rrt-module"
class="org.apache.james.rrt.cassandra.CassandraRRTModule"
lazy-init="true"/>
@@ -86,6 +90,7 @@
<list>
<ref bean="cassandra-mailbox-uid-modseq-module"/>
<ref bean="cassandra-subscription-module"/>
+ <ref bean="cassandra-mailbox-registration-module"/>
<ref bean="cassandra-mailbox-module"/>
<ref bean="cassandra-message-module"/>
<ref bean="cassandra-mailbox-counters-module"/>
Added:
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegisterMapper.java
URL:
http://svn.apache.org/viewvc/james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegisterMapper.java?rev=1716960&view=auto
==============================================================================
---
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegisterMapper.java
(added)
+++
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegisterMapper.java
Sat Nov 28 13:04:34 2015
@@ -0,0 +1,91 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.event.distributed;
+
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.delete;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.ttl;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.UDTValue;
+import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
+import org.apache.james.backends.cassandra.utils.CassandraUtils;
+import
org.apache.james.mailbox.cassandra.table.CassandraMailboxPathRegisterTable;
+import org.apache.james.mailbox.model.MailboxPath;
+import
org.apache.james.mailbox.store.event.distributed.DistantMailboxPathRegisterMapper;
+import org.apache.james.mailbox.store.publisher.Topic;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class CassandraMailboxPathRegisterMapper implements
DistantMailboxPathRegisterMapper {
+
+ private final Session session;
+ private final CassandraTypesProvider typesProvider;
+ private final int cassandraTimeOutInS;
+ private final PreparedStatement insertStatement;
+ private final PreparedStatement deleteStatement;
+ private final PreparedStatement selectStatement;
+
+ public CassandraMailboxPathRegisterMapper(Session session,
CassandraTypesProvider typesProvider, int cassandraTimeOutInS) {
+ this.session = session;
+ this.typesProvider = typesProvider;
+ this.cassandraTimeOutInS = cassandraTimeOutInS;
+ this.insertStatement =
session.prepare(insertInto(CassandraMailboxPathRegisterTable.TABLE_NAME)
+ .value(CassandraMailboxPathRegisterTable.MAILBOX_PATH,
bindMarker())
+ .value(CassandraMailboxPathRegisterTable.TOPIC, bindMarker())
+ .using(ttl(bindMarker())));
+ this.deleteStatement =
session.prepare(delete().from(CassandraMailboxPathRegisterTable.TABLE_NAME)
+ .where(eq(CassandraMailboxPathRegisterTable.MAILBOX_PATH,
bindMarker()))
+ .and(eq(CassandraMailboxPathRegisterTable.TOPIC, bindMarker())));
+ this.selectStatement =
session.prepare(select().from(CassandraMailboxPathRegisterTable.TABLE_NAME)
+ .where(eq(CassandraMailboxPathRegisterTable.MAILBOX_PATH,
bindMarker())));
+ }
+
+ @Override
+ public Set<Topic> getTopics(MailboxPath mailboxPath) {
+ return
CassandraUtils.convertToStream(session.execute(selectStatement.bind(buildUDTFromMailboxPath(mailboxPath))))
+ .map(row -> new
Topic(row.getString(CassandraMailboxPathRegisterTable.TOPIC)))
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public void doRegister(MailboxPath mailboxPath, Topic topic) {
+
session.execute(insertStatement.bind(buildUDTFromMailboxPath(mailboxPath),
topic.getValue(), cassandraTimeOutInS));
+ }
+
+ @Override
+ public void doUnRegister(MailboxPath mailboxPath, Topic topic) {
+
session.execute(deleteStatement.bind(buildUDTFromMailboxPath(mailboxPath),
topic.getValue()));
+ }
+
+ private UDTValue buildUDTFromMailboxPath(MailboxPath path) {
+ return
typesProvider.getDefinedUserType(CassandraMailboxPathRegisterTable.MAILBOX_PATH)
+ .newValue()
+
.setString(CassandraMailboxPathRegisterTable.MailboxPath.NAMESPACE,
path.getNamespace())
+ .setString(CassandraMailboxPathRegisterTable.MailboxPath.USER,
path.getUser())
+ .setString(CassandraMailboxPathRegisterTable.MailboxPath.NAME,
path.getName());
+ }
+
+}
Modified:
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
URL:
http://svn.apache.org/viewvc/james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java?rev=1716960&r1=1716959&r2=1716960&view=diff
==============================================================================
---
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
(original)
+++
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
Sat Nov 28 13:04:34 2015
@@ -32,13 +32,13 @@ import org.apache.james.backends.cassand
import
org.apache.james.backends.cassandra.utils.LightweightTransactionException;
import org.apache.james.mailbox.cassandra.CassandraId;
import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry;
-import
org.apache.james.mailbox.cassandra.mail.utils.SimpleMailboxACLJsonConverter;
import org.apache.james.mailbox.cassandra.table.CassandraACLTable;
import org.apache.james.mailbox.cassandra.table.CassandraMailboxTable;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.exception.UnsupportedRightException;
import org.apache.james.mailbox.model.MailboxACL;
import org.apache.james.mailbox.model.SimpleMailboxACL;
+import org.apache.james.mailbox.store.json.SimpleMailboxACLJsonConverter;
import org.apache.james.mailbox.store.mail.model.Mailbox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Added:
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraRegistrationModule.java
URL:
http://svn.apache.org/viewvc/james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraRegistrationModule.java?rev=1716960&view=auto
==============================================================================
---
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraRegistrationModule.java
(added)
+++
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraRegistrationModule.java
Sat Nov 28 13:04:34 2015
@@ -0,0 +1,74 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.modules;
+
+import com.datastax.driver.core.schemabuilder.SchemaBuilder;
+import org.apache.james.backends.cassandra.components.CassandraIndex;
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.cassandra.components.CassandraTable;
+import org.apache.james.backends.cassandra.components.CassandraType;
+import org.apache.james.mailbox.cassandra.table.CassandraACLTable;
+import
org.apache.james.mailbox.cassandra.table.CassandraMailboxPathRegisterTable;
+
+import java.util.Collections;
+import java.util.List;
+
+import static com.datastax.driver.core.DataType.bigint;
+import static com.datastax.driver.core.DataType.text;
+import static com.datastax.driver.core.DataType.timeuuid;
+
+public class CassandraRegistrationModule implements CassandraModule {
+
+ private final List<CassandraTable> tables;
+ private final List<CassandraIndex> index;
+ private final List<CassandraType> types;
+
+ public CassandraRegistrationModule() {
+ tables = Collections.singletonList(
+ new CassandraTable(CassandraMailboxPathRegisterTable.TABLE_NAME,
+
SchemaBuilder.createTable(CassandraMailboxPathRegisterTable.TABLE_NAME)
+ .ifNotExists()
+
.addUDTPartitionKey(CassandraMailboxPathRegisterTable.MAILBOX_PATH,
SchemaBuilder.frozen(CassandraMailboxPathRegisterTable.MAILBOX_PATH))
+
.addClusteringColumn(CassandraMailboxPathRegisterTable.TOPIC, text())));
+ index = Collections.emptyList();
+ types = Collections.singletonList(
+ new CassandraType(CassandraMailboxPathRegisterTable.MAILBOX_PATH,
+
SchemaBuilder.createType(CassandraMailboxPathRegisterTable.MAILBOX_PATH)
+ .ifNotExists()
+
.addColumn(CassandraMailboxPathRegisterTable.MailboxPath.NAMESPACE, text())
+
.addColumn(CassandraMailboxPathRegisterTable.MailboxPath.NAME, text())
+
.addColumn(CassandraMailboxPathRegisterTable.MailboxPath.USER, text())));
+ }
+
+ @Override
+ public List<CassandraTable> moduleTables() {
+ return tables;
+ }
+
+ @Override
+ public List<CassandraIndex> moduleIndex() {
+ return index;
+ }
+
+ @Override
+ public List<CassandraType> moduleTypes() {
+ return types;
+ }
+}
Added:
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMailboxPathRegisterTable.java
URL:
http://svn.apache.org/viewvc/james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMailboxPathRegisterTable.java?rev=1716960&view=auto
==============================================================================
---
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMailboxPathRegisterTable.java
(added)
+++
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMailboxPathRegisterTable.java
Sat Nov 28 13:04:34 2015
@@ -0,0 +1,41 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.table;
+
+/**
+ * Created by benwa on 05/10/15.
+ * <p>
+ * Project under the Apache v 2 license
+ */
+public interface CassandraMailboxPathRegisterTable {
+
+ String TABLE_NAME = "mailboxPathRegister";
+
+ String MAILBOX_PATH = "mailboxPath";
+
+ String TOPIC = "topic";
+
+ interface MailboxPath {
+ String NAMESPACE = "namespace";
+ String USER = "user";
+ String NAME = "name";
+ }
+
+}
Added:
james/project/trunk/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegistrerMapperTest.java
URL:
http://svn.apache.org/viewvc/james/project/trunk/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegistrerMapperTest.java?rev=1716960&view=auto
==============================================================================
---
james/project/trunk/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegistrerMapperTest.java
(added)
+++
james/project/trunk/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegistrerMapperTest.java
Sat Nov 28 13:04:34 2015
@@ -0,0 +1,106 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.event.distributed;
+
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.mailbox.cassandra.modules.CassandraRegistrationModule;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.store.publisher.Topic;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class CassandraMailboxPathRegistrerMapperTest {
+
+ private static final CassandraCluster cassandra =
CassandraCluster.create(new CassandraRegistrationModule());
+ private static final MailboxPath MAILBOX_PATH = new
MailboxPath("namespace", "user", "name");
+ private static final MailboxPath MAILBOX_PATH_2 = new
MailboxPath("namespace2", "user2", "name2");
+ private static final Topic TOPIC = new Topic("topic");
+ private static final int CASSANDRA_TIME_OUT_IN_S = 1;
+ private static final int CASSANDRA_TIME_OUT_IN_MS = 1000 *
CASSANDRA_TIME_OUT_IN_S;
+ private static final Topic TOPIC_2 = new Topic("topic2");
+
+ private CassandraMailboxPathRegisterMapper mapper;
+
+ @Before
+ public void setUp() {
+ mapper = new CassandraMailboxPathRegisterMapper(cassandra.getConf(),
cassandra.getTypesProvider(), CASSANDRA_TIME_OUT_IN_S);
+ }
+
+ @After
+ public void tearDown() {
+ cassandra.clearAllTables();
+ }
+
+ @Test
+ public void getTopicsShouldReturnEmptyResultByDefault() {
+ assertThat(mapper.getTopics(MAILBOX_PATH)).isEmpty();
+ }
+
+ @Test
+ public void doRegisterShouldWork() {
+ mapper.doRegister(MAILBOX_PATH, TOPIC);
+ assertThat(mapper.getTopics(MAILBOX_PATH)).containsOnly(TOPIC);
+ }
+
+ @Test
+ public void doRegisterShouldBeMailboxPathSpecific() {
+ mapper.doRegister(MAILBOX_PATH, TOPIC);
+ assertThat(mapper.getTopics(MAILBOX_PATH_2)).isEmpty();
+ }
+
+ @Test
+ public void doRegisterShouldAllowMultipleTopics() {
+ mapper.doRegister(MAILBOX_PATH, TOPIC);
+ mapper.doRegister(MAILBOX_PATH, TOPIC_2);
+ assertThat(mapper.getTopics(MAILBOX_PATH)).containsOnly(TOPIC,
TOPIC_2);
+ }
+
+ @Test
+ public void doUnRegisterShouldWork() {
+ mapper.doRegister(MAILBOX_PATH, TOPIC);
+ mapper.doUnRegister(MAILBOX_PATH, TOPIC);
+ assertThat(mapper.getTopics(MAILBOX_PATH)).isEmpty();
+ }
+
+ @Test
+ public void doUnregisterShouldBeMailboxSpecific() {
+ mapper.doRegister(MAILBOX_PATH, TOPIC);
+ mapper.doUnRegister(MAILBOX_PATH_2, TOPIC);
+ assertThat(mapper.getTopics(MAILBOX_PATH)).containsOnly(TOPIC);
+ }
+
+ @Test
+ public void doUnregisterShouldBeTopicSpecific() {
+ mapper.doRegister(MAILBOX_PATH, TOPIC);
+ mapper.doUnRegister(MAILBOX_PATH, TOPIC_2);
+ assertThat(mapper.getTopics(MAILBOX_PATH)).containsOnly(TOPIC);
+ }
+
+ @Test
+ public void entriesShouldExpire() throws Exception {
+ mapper.doRegister(MAILBOX_PATH, TOPIC);
+ Thread.sleep(2 * CASSANDRA_TIME_OUT_IN_MS);
+ assertThat(mapper.getTopics(MAILBOX_PATH)).isEmpty();
+ }
+
+}
Added:
james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegister.java
URL:
http://svn.apache.org/viewvc/james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegister.java?rev=1716960&view=auto
==============================================================================
---
james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegister.java
(added)
+++
james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegister.java
Sat Nov 28 13:04:34 2015
@@ -0,0 +1,195 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.store.event.distributed;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.store.publisher.Topic;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class DistantMailboxPathRegister implements MailboxPathRegister {
+ private static final int DEFAULT_MAX_RETRY = 1000;
+ private final ConcurrentHashMap<MailboxPath, Long>
registeredMailboxPathCount;
+ private final DistantMailboxPathRegisterMapper mapper;
+ private final Topic topic;
+ private final Timer timer;
+ private final int maxRetry;
+ private final long schedulerPeriodInS;
+
+ public DistantMailboxPathRegister(DistantMailboxPathRegisterMapper mapper,
long schedulerPeriodInS) {
+ this(mapper, DEFAULT_MAX_RETRY, schedulerPeriodInS);
+ }
+
+ public DistantMailboxPathRegister(DistantMailboxPathRegisterMapper mapper,
int maxRetry, long schedulerPeriodInS) {
+ this.maxRetry = maxRetry;
+ this.mapper = mapper;
+ this.registeredMailboxPathCount = new ConcurrentHashMap<MailboxPath,
Long>();
+ this.topic = new Topic(UUID.randomUUID().toString());
+ this.timer = new Timer();
+ this.schedulerPeriodInS = schedulerPeriodInS;
+ }
+
+ @PostConstruct
+ public void init() {
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ Set<Map.Entry<MailboxPath, Long>> snapshot =
ImmutableSet.copyOf(registeredMailboxPathCount.entrySet());
+ for(Map.Entry<MailboxPath, Long> entry : snapshot) {
+ if (entry.getValue() > 0) {
+ mapper.doRegister(entry.getKey(), topic);
+ }
+ }
+ }
+ }, 0L, schedulerPeriodInS * 1000);
+ }
+
+ @PreDestroy
+ public void destroy() {
+ timer.cancel();
+ timer.purge();
+ }
+
+ @Override
+ public Set<Topic> getTopics(MailboxPath mailboxPath) {
+ return mapper.getTopics(mailboxPath);
+ }
+
+ @Override
+ public Topic getLocalTopic() {
+ return topic;
+ }
+
+ @Override
+ public void register(MailboxPath path) throws MailboxException {
+ int count = 0;
+ boolean success = false;
+ while (count < maxRetry && !success) {
+ count ++;
+ success = tryRegister(path);
+ }
+ if (!success) {
+ throw new MailboxException(maxRetry + " reached while trying to
register " + path);
+ }
+ }
+
+ @Override
+ public void unregister(MailboxPath path) throws MailboxException {
+ int count = 0;
+ boolean success = false;
+ while (count < maxRetry && !success) {
+ count ++;
+ success = tryUnregister(path);
+ }
+ if (!success) {
+ throw new MailboxException(maxRetry + " reached while trying to
unregister " + path);
+ }
+ }
+
+ @Override
+ public void doCompleteUnRegister(MailboxPath mailboxPath) {
+ registeredMailboxPathCount.remove(mailboxPath);
+ mapper.doUnRegister(mailboxPath, topic);
+ }
+
+ @Override
+ public void doRename(MailboxPath oldPath, MailboxPath newPath) throws
MailboxException {
+ try {
+ int count = 0;
+ boolean success = false;
+ while (count < maxRetry && !success) {
+ success = tryCountTransfer(oldPath, newPath);
+ }
+ if (!success) {
+ throw new MailboxException(maxRetry + " reached while trying
to rename " + oldPath + " in " + newPath);
+ }
+ } finally {
+ doCompleteUnRegister(oldPath);
+ }
+ }
+
+ private boolean tryCountTransfer(MailboxPath oldPath, MailboxPath newPath)
throws MailboxException {
+ Long oldEntry = registeredMailboxPathCount.get(oldPath);
+ if (oldEntry == null) {
+ throw new MailboxException("Renamed entry does not exists");
+ }
+ Long entry = registeredMailboxPathCount.get(newPath);
+ if (entry != null) {
+ return registeredMailboxPathCount.replace(newPath, entry, oldEntry
+ entry);
+ } else {
+ if (registeredMailboxPathCount.putIfAbsent(newPath, oldEntry) ==
null) {
+ mapper.doRegister(newPath, topic);
+ return true;
+ }
+ return false;
+ }
+ }
+
+ private boolean tryRegister(MailboxPath path) {
+ Long entry = registeredMailboxPathCount.get(path);
+ Long newEntry = entry;
+ if (newEntry == null) {
+ newEntry = 0L;
+ }
+ newEntry++;
+ if (entry != null) {
+ return registeredMailboxPathCount.replace(path, entry, newEntry);
+ } else {
+ if (registeredMailboxPathCount.putIfAbsent(path, newEntry) ==
null) {
+ mapper.doRegister(path, topic);
+ return true;
+ }
+ return false;
+ }
+ }
+
+ private boolean tryUnregister(MailboxPath path) throws MailboxException {
+ Long entry = registeredMailboxPathCount.get(path);
+ Long newEntry = entry;
+ if (newEntry == null) {
+ throw new MailboxException("Removing a non registered
mailboxPath");
+ }
+ newEntry--;
+ if (newEntry != 0) {
+ return registeredMailboxPathCount.replace(path, entry, newEntry);
+ } else {
+ if (registeredMailboxPathCount.remove(path, entry)) {
+ mapper.doUnRegister(path, topic);
+ return true;
+ }
+ return false;
+ }
+ }
+
+ @VisibleForTesting
+ ConcurrentHashMap<MailboxPath, Long> getRegisteredMailboxPathCount() {
+ return registeredMailboxPathCount;
+ }
+}
Added:
james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegisterMapper.java
URL:
http://svn.apache.org/viewvc/james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegisterMapper.java?rev=1716960&view=auto
==============================================================================
---
james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegisterMapper.java
(added)
+++
james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegisterMapper.java
Sat Nov 28 13:04:34 2015
@@ -0,0 +1,35 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.store.event.distributed;
+
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.store.publisher.Topic;
+
+import java.util.Set;
+
+public interface DistantMailboxPathRegisterMapper {
+
+ Set<Topic> getTopics(MailboxPath mailboxPath);
+
+ void doRegister(MailboxPath mailboxPath, Topic topic);
+
+ void doUnRegister(MailboxPath mailboxPath, Topic topic);
+
+}
Added:
james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/MailboxPathRegister.java
URL:
http://svn.apache.org/viewvc/james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/MailboxPathRegister.java?rev=1716960&view=auto
==============================================================================
---
james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/MailboxPathRegister.java
(added)
+++
james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/MailboxPathRegister.java
Sat Nov 28 13:04:34 2015
@@ -0,0 +1,58 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.store.event.distributed;
+
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.store.publisher.Topic;
+
+import java.util.Set;
+
+/**
+ * The TopicDispatcher allow you to :
+ *
+ * - know to which queues you will need to send an event
+ * - get the topic this James instance will be pooling
+ */
+public interface MailboxPathRegister {
+
+ /**
+ * Given a MailboxPath, we want to get the different topics we need to
send the event to.
+ *
+ * @param mailboxPath MailboxPath
+ * @return List of topics concerned by this MailboxPath
+ */
+ Set<Topic> getTopics(MailboxPath mailboxPath);
+
+ /**
+ * Get the topic this James instance will consume
+ *
+ * @return The topic this James instance will consume
+ */
+ Topic getLocalTopic();
+
+ void register(MailboxPath path) throws MailboxException;
+
+ void unregister(MailboxPath path) throws MailboxException;
+
+ void doCompleteUnRegister(MailboxPath mailboxPath);
+
+ void doRename(MailboxPath oldPath, MailboxPath newPath) throws
MailboxException;
+}
\ No newline at end of file
Added:
james/project/trunk/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegisterTest.java
URL:
http://svn.apache.org/viewvc/james/project/trunk/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegisterTest.java?rev=1716960&view=auto
==============================================================================
---
james/project/trunk/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegisterTest.java
(added)
+++
james/project/trunk/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegisterTest.java
Sat Nov 28 13:04:34 2015
@@ -0,0 +1,364 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.store.event.distributed;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import static org.assertj.core.api.Assertions.fail;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Sets;
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+public class DistantMailboxPathRegisterTest {
+
+ private static final MailboxPath MAILBOX_PATH = new
MailboxPath("namespace", "user", "name");
+ private static final MailboxPath NEW_MAILBOX_PATH = new
MailboxPath("namespace_new", "user_new", "name_new");
+ private static final String TOPIC = "topic";
+
+ private DistantMailboxPathRegisterMapper mockedMapper;
+ private DistantMailboxPathRegister register;
+
+ @Before
+ public void setUp() {
+ mockedMapper = mock(DistantMailboxPathRegisterMapper.class);
+ register = new DistantMailboxPathRegister(mockedMapper, 1);
+ }
+
+ @Test(expected = MailboxException.class)
+ public void doRenameShouldThrowIfTryingToRenameNonExistingPath() throws
Exception {
+ register.doRename(MAILBOX_PATH, NEW_MAILBOX_PATH);
+ verifyNoMoreInteractions(mockedMapper);
+ }
+
+ @Test
+ public void getTopicsShouldWork() {
+ final HashSet<String> result = Sets.newHashSet(TOPIC);
+ when(mockedMapper.getTopics(MAILBOX_PATH)).thenAnswer(new
Answer<Set<String>>() {
+ @Override
+ public Set<String> answer(InvocationOnMock invocationOnMock)
throws Throwable {
+ return result;
+ }
+ });
+ assertThat(register.getTopics(MAILBOX_PATH)).isEqualTo(result);
+ }
+
+ @Test
+ public void registerShouldWork() throws MailboxException {
+ register.register(MAILBOX_PATH);
+ verify(mockedMapper).doRegister(MAILBOX_PATH,
register.getLocalTopic());
+ verifyNoMoreInteractions(mockedMapper);
+ }
+
+ @Test
+ public void registerShouldCallMapperOnce() throws MailboxException {
+ register.register(MAILBOX_PATH);
+ verify(mockedMapper).doRegister(MAILBOX_PATH,
register.getLocalTopic());
+ register.register(MAILBOX_PATH);
+ verifyNoMoreInteractions(mockedMapper);
+ }
+
+ @Test
+ public void unregisterShouldWork() throws MailboxException {
+ register.register(MAILBOX_PATH);
+ verify(mockedMapper).doRegister(MAILBOX_PATH,
register.getLocalTopic());
+ register.unregister(MAILBOX_PATH);
+ verify(mockedMapper).doUnRegister(MAILBOX_PATH,
register.getLocalTopic());
+ verifyNoMoreInteractions(mockedMapper);
+ }
+
+
+ @Test
+ public void unregisterShouldNotCallMapperIfListenersAreStillPresent()
throws MailboxException {
+ register.register(MAILBOX_PATH);
+ verify(mockedMapper).doRegister(MAILBOX_PATH,
register.getLocalTopic());
+ register.register(MAILBOX_PATH);
+ register.unregister(MAILBOX_PATH);
+ verifyNoMoreInteractions(mockedMapper);
+ verifyNoMoreInteractions(mockedMapper);
+ }
+
+ @Test
+ public void unregisterShouldWorkWhenMultipleListenersWereRegistered()
throws MailboxException {
+ register.register(MAILBOX_PATH);
+ verify(mockedMapper).doRegister(MAILBOX_PATH,
register.getLocalTopic());
+ register.register(MAILBOX_PATH);
+ register.unregister(MAILBOX_PATH);
+ verifyNoMoreInteractions(mockedMapper);
+ register.unregister(MAILBOX_PATH);
+ verify(mockedMapper).doUnRegister(MAILBOX_PATH,
register.getLocalTopic());
+ verifyNoMoreInteractions(mockedMapper);
+ }
+
+ @Test
+ public void doRenameShouldWork() throws Exception {
+ register.register(MAILBOX_PATH);
+ verify(mockedMapper).doRegister(MAILBOX_PATH,
register.getLocalTopic());
+ register.doRename(MAILBOX_PATH, NEW_MAILBOX_PATH);
+ verify(mockedMapper).doRegister(NEW_MAILBOX_PATH,
register.getLocalTopic());
+ verify(mockedMapper).doUnRegister(MAILBOX_PATH,
register.getLocalTopic());
+
assertThat(register.getRegisteredMailboxPathCount()).containsEntry(NEW_MAILBOX_PATH,
1L);
+ verifyNoMoreInteractions(mockedMapper);
+ }
+
+ @Test
+ public void doRenameShouldWorkWhenEntryAlreadyExists() throws Exception {
+ register.register(MAILBOX_PATH);
+ verify(mockedMapper).doRegister(MAILBOX_PATH,
register.getLocalTopic());
+ register.register(NEW_MAILBOX_PATH);
+ verify(mockedMapper).doRegister(NEW_MAILBOX_PATH,
register.getLocalTopic());
+ register.doRename(MAILBOX_PATH, NEW_MAILBOX_PATH);
+ verify(mockedMapper).doUnRegister(MAILBOX_PATH,
register.getLocalTopic());
+
assertThat(register.getRegisteredMailboxPathCount()).containsEntry(NEW_MAILBOX_PATH,
2L);
+ verifyNoMoreInteractions(mockedMapper);
+ }
+
+ @Test
+ public void mapShouldBeEmptyInitially() {
+ assertThat(register.getRegisteredMailboxPathCount()).isEmpty();
+ }
+
+ @Test
+ public void mapShouldContainOneListenerOnPathAfterRegister() throws
MailboxException {
+ register.register(MAILBOX_PATH);
+
assertThat(register.getRegisteredMailboxPathCount()).containsEntry(MAILBOX_PATH,
1L);
+ }
+
+ @Test
+ public void mapShouldContainTwoListenerOnPathAfterTwoRegister() throws
MailboxException {
+ register.register(MAILBOX_PATH);
+ register.register(MAILBOX_PATH);
+
assertThat(register.getRegisteredMailboxPathCount()).containsEntry(MAILBOX_PATH,
2L);
+ }
+
+ @Test
+ public void mapListenerCountShouldBeOkAfterTwoRegisterAndOneUnregister()
throws MailboxException {
+ register.register(MAILBOX_PATH);
+ register.register(MAILBOX_PATH);
+ register.unregister(MAILBOX_PATH);
+
assertThat(register.getRegisteredMailboxPathCount()).containsEntry(MAILBOX_PATH,
1L);
+ }
+
+ @Test
+ public void
mapListenerCountShouldBeEmptyAfterTwoRegisterAndOneUnregister() throws
MailboxException {
+ register.register(MAILBOX_PATH);
+ register.unregister(MAILBOX_PATH);
+ assertThat(register.getRegisteredMailboxPathCount()).isEmpty();
+ }
+
+ @Test
+ public void mapListenerCountShouldBeEmptyAfterDoCompleteUnregister()
throws MailboxException {
+ register.register(MAILBOX_PATH);
+ register.doCompleteUnRegister(MAILBOX_PATH);
+ assertThat(register.getRegisteredMailboxPathCount()).isEmpty();
+ }
+
+ @Test
+ public void mapListenerCountShouldHandleRename() throws Exception {
+ register.register(MAILBOX_PATH);
+ register.doRename(MAILBOX_PATH, NEW_MAILBOX_PATH);
+
assertThat(register.getRegisteredMailboxPathCount()).containsEntry(NEW_MAILBOX_PATH,
1L);
+ }
+
+ @Test
+ public void mapListenerCountShouldHandleRenameWhenEntryAlreadyExists()
throws Exception {
+ register.register(MAILBOX_PATH);
+ register.register(NEW_MAILBOX_PATH);
+ register.doRename(MAILBOX_PATH, NEW_MAILBOX_PATH);
+
assertThat(register.getRegisteredMailboxPathCount()).containsEntry(NEW_MAILBOX_PATH,
2L);
+ }
+
+ @Test
+ public void registerShouldNotBeAffectedByMapperError() throws
MailboxException {
+ doThrow(new
RuntimeException()).when(mockedMapper).doRegister(MAILBOX_PATH,
register.getLocalTopic());
+ try {
+ register.register(MAILBOX_PATH);
+ fail("Register should have thrown");
+ } catch (RuntimeException e) {
+
assertThat(register.getRegisteredMailboxPathCount()).containsEntry(MAILBOX_PATH,
1L);
+ }
+ }
+
+ @Test
+ public void unregisterShouldNotBeAffectedByMapperErrors() throws
MailboxException {
+ register.register(MAILBOX_PATH);
+ doThrow(new
RuntimeException()).when(mockedMapper).doUnRegister(MAILBOX_PATH,
register.getLocalTopic());
+ try {
+ register.unregister(MAILBOX_PATH);
+ fail("Register should have thrown");
+ } catch (RuntimeException e) {
+ assertThat(register.getRegisteredMailboxPathCount()).isEmpty();
+ }
+ }
+
+ @Test
+ public void renameShouldNotBeAffectedByMapperErrors() throws
MailboxException {
+ register.register(MAILBOX_PATH);
+ doThrow(new
RuntimeException()).when(mockedMapper).doRegister(NEW_MAILBOX_PATH,
register.getLocalTopic());
+ try {
+ register.doRename(MAILBOX_PATH, NEW_MAILBOX_PATH);
+ fail("Register should have thrown");
+ } catch (RuntimeException e) {
+
assertThat(register.getRegisteredMailboxPathCount()).containsEntry(NEW_MAILBOX_PATH,
1L)
+ .doesNotContainKey(MAILBOX_PATH);
+ }
+ }
+
+ @Test
+ public void completeUnregisterShouldNotBeAffectedByMapperErrors() throws
MailboxException {
+ register.register(MAILBOX_PATH);
+ doThrow(new
RuntimeException()).when(mockedMapper).doUnRegister(MAILBOX_PATH,
register.getLocalTopic());
+ try {
+ register.doCompleteUnRegister(MAILBOX_PATH);
+ fail("Register should have thrown");
+ } catch (RuntimeException e) {
+ assertThat(register.getRegisteredMailboxPathCount()).isEmpty();
+ }
+ }
+
+ @Test
+ public void registerShouldWorkInAConcurrentEnvironment() throws Exception {
+ int numTask = 2;
+ final long increments = 100;
+ ExecutorService executorService =
Executors.newFixedThreadPool(numTask);
+ for (int i = 0; i < numTask; i++) {
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ int j = 0;
+ while (j < increments) {
+ register.register(MAILBOX_PATH);
+ j++;
+ }
+ } catch (Exception e) {
+ fail("Exception caught in thread", e);
+ }
+ }
+ });
+ }
+ executorService.shutdown();
+ executorService.awaitTermination(10, TimeUnit.SECONDS);
+
assertThat(register.getRegisteredMailboxPathCount()).containsEntry(MAILBOX_PATH,
numTask * increments);
+ }
+
+ @Test
+ public void unregisterShouldWorkInAConcurrentEnvironment() throws
Exception {
+ int numTask = 2;
+ final long increments = 100;
+ for (int i = 0; i < numTask * increments; i++) {
+ register.register(MAILBOX_PATH);
+ }
+ ExecutorService executorService =
Executors.newFixedThreadPool(numTask);
+ for (int i = 0; i < numTask; i++) {
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ int j = 0;
+ while (j < increments) {
+ register.unregister(MAILBOX_PATH);
+ j++;
+ }
+ } catch (Exception e) {
+ fail("Exception caught in thread", e);
+ }
+ }
+ });
+ }
+ executorService.shutdown();
+ executorService.awaitTermination(10, TimeUnit.SECONDS);
+ assertThat(register.getRegisteredMailboxPathCount()).isEmpty();
+ }
+
+ @Test
+ public void
unregisterMixedWithRegisterShouldWorkInAConcurrentEnvironment() throws
Exception {
+ int numTask = 2;
+ final long increments = 100;
+ for (int i = 0; i < increments; i++) {
+ register.register(MAILBOX_PATH);
+ }
+ ExecutorService executorService = Executors.newFixedThreadPool(2*
numTask);
+ for (int i = 0; i < numTask; i++) {
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ int j = 0;
+ while (j < increments) {
+ register.register(MAILBOX_PATH);
+ j++;
+ }
+ } catch (Exception e) {
+ fail("Exception caught in thread", e);
+ }
+ }
+ });
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ int j = 0;
+ while (j < increments) {
+ register.unregister(MAILBOX_PATH);
+ j++;
+ }
+ } catch (Exception e) {
+ fail("Exception caught in thread", e);
+ }
+ }
+ });
+ }
+ executorService.shutdown();
+ executorService.awaitTermination(10, TimeUnit.SECONDS);
+
assertThat(register.getRegisteredMailboxPathCount()).containsEntry(MAILBOX_PATH,
increments);
+ }
+
+ @Test
+ public void schedulerShouldWork() throws Exception {
+ register.register(MAILBOX_PATH);
+ try {
+ register.init();
+ Thread.sleep(1050);
+
+ } finally {
+ register.destroy();
+ }
+ verify(mockedMapper, times(3)).doRegister(MAILBOX_PATH,
register.getLocalTopic());
+ verifyNoMoreInteractions(mockedMapper);
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]