Author: btellier
Date: Sat Nov 28 13:04:34 2015
New Revision: 1716960

URL: http://svn.apache.org/viewvc?rev=1716960&view=rev
Log:
MAILBOX-211 Add a registration system based on MailboxPaths and a Cassandra 
based implementation

Added:
    
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/event/
    
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/event/distributed/
    
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegisterMapper.java
    
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraRegistrationModule.java
    
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMailboxPathRegisterTable.java
    
james/project/trunk/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/event/
    
james/project/trunk/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/event/distributed/
    
james/project/trunk/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegistrerMapperTest.java
    
james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/
    
james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegister.java
    
james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegisterMapper.java
    
james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/MailboxPathRegister.java
    
james/project/trunk/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/distributed/
    
james/project/trunk/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegisterTest.java
Modified:
    
james/project/trunk/backends-common/cassandra/src/main/resources/META-INF/cassandra-session.xml
    
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java

Modified: 
james/project/trunk/backends-common/cassandra/src/main/resources/META-INF/cassandra-session.xml
URL: 
http://svn.apache.org/viewvc/james/project/trunk/backends-common/cassandra/src/main/resources/META-INF/cassandra-session.xml?rev=1716960&r1=1716959&r2=1716960&view=diff
==============================================================================
--- 
james/project/trunk/backends-common/cassandra/src/main/resources/META-INF/cassandra-session.xml
 (original)
+++ 
james/project/trunk/backends-common/cassandra/src/main/resources/META-INF/cassandra-session.xml
 Sat Nov 28 13:04:34 2015
@@ -67,6 +67,10 @@
           
class="org.apache.james.mailbox.cassandra.modules.CassandraQuotaModule"
           lazy-init="true"/>
 
+    <bean id="cassandra-mailbox-registration-module"
+          
class="org.apache.james.mailbox.cassandra.modules.CassandraRegistrationModule"
+          lazy-init="true"/>
+
     <bean id="cassandra-rrt-module"
           class="org.apache.james.rrt.cassandra.CassandraRRTModule"
           lazy-init="true"/>
@@ -86,6 +90,7 @@
             <list>
                 <ref bean="cassandra-mailbox-uid-modseq-module"/>
                 <ref bean="cassandra-subscription-module"/>
+                <ref bean="cassandra-mailbox-registration-module"/>
                 <ref bean="cassandra-mailbox-module"/>
                 <ref bean="cassandra-message-module"/>
                 <ref bean="cassandra-mailbox-counters-module"/>

Added: 
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegisterMapper.java
URL: 
http://svn.apache.org/viewvc/james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegisterMapper.java?rev=1716960&view=auto
==============================================================================
--- 
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegisterMapper.java
 (added)
+++ 
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegisterMapper.java
 Sat Nov 28 13:04:34 2015
@@ -0,0 +1,91 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.event.distributed;
+
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.delete;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.ttl;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.UDTValue;
+import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
+import org.apache.james.backends.cassandra.utils.CassandraUtils;
+import 
org.apache.james.mailbox.cassandra.table.CassandraMailboxPathRegisterTable;
+import org.apache.james.mailbox.model.MailboxPath;
+import 
org.apache.james.mailbox.store.event.distributed.DistantMailboxPathRegisterMapper;
+import org.apache.james.mailbox.store.publisher.Topic;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class CassandraMailboxPathRegisterMapper implements 
DistantMailboxPathRegisterMapper {
+
+    private final Session session;
+    private final CassandraTypesProvider typesProvider;
+    private final int cassandraTimeOutInS;
+    private final PreparedStatement insertStatement;
+    private final PreparedStatement deleteStatement;
+    private final PreparedStatement selectStatement;
+
+    public CassandraMailboxPathRegisterMapper(Session session, 
CassandraTypesProvider typesProvider, int cassandraTimeOutInS) {
+        this.session = session;
+        this.typesProvider = typesProvider;
+        this.cassandraTimeOutInS = cassandraTimeOutInS;
+        this.insertStatement = 
session.prepare(insertInto(CassandraMailboxPathRegisterTable.TABLE_NAME)
+            .value(CassandraMailboxPathRegisterTable.MAILBOX_PATH, 
bindMarker())
+            .value(CassandraMailboxPathRegisterTable.TOPIC, bindMarker())
+            .using(ttl(bindMarker())));
+        this.deleteStatement = 
session.prepare(delete().from(CassandraMailboxPathRegisterTable.TABLE_NAME)
+            .where(eq(CassandraMailboxPathRegisterTable.MAILBOX_PATH, 
bindMarker()))
+            .and(eq(CassandraMailboxPathRegisterTable.TOPIC, bindMarker())));
+        this.selectStatement = 
session.prepare(select().from(CassandraMailboxPathRegisterTable.TABLE_NAME)
+            .where(eq(CassandraMailboxPathRegisterTable.MAILBOX_PATH, 
bindMarker())));
+    }
+
+    @Override
+    public Set<Topic> getTopics(MailboxPath mailboxPath) {
+        return 
CassandraUtils.convertToStream(session.execute(selectStatement.bind(buildUDTFromMailboxPath(mailboxPath))))
+            .map(row -> new 
Topic(row.getString(CassandraMailboxPathRegisterTable.TOPIC)))
+            .collect(Collectors.toSet());
+    }
+
+    @Override
+    public void doRegister(MailboxPath mailboxPath, Topic topic) {
+        
session.execute(insertStatement.bind(buildUDTFromMailboxPath(mailboxPath), 
topic.getValue(), cassandraTimeOutInS));
+    }
+
+    @Override
+    public void doUnRegister(MailboxPath mailboxPath, Topic topic) {
+        
session.execute(deleteStatement.bind(buildUDTFromMailboxPath(mailboxPath), 
topic.getValue()));
+    }
+
+    private UDTValue buildUDTFromMailboxPath(MailboxPath path) {
+        return 
typesProvider.getDefinedUserType(CassandraMailboxPathRegisterTable.MAILBOX_PATH)
+            .newValue()
+            
.setString(CassandraMailboxPathRegisterTable.MailboxPath.NAMESPACE, 
path.getNamespace())
+            .setString(CassandraMailboxPathRegisterTable.MailboxPath.USER, 
path.getUser())
+            .setString(CassandraMailboxPathRegisterTable.MailboxPath.NAME, 
path.getName());
+    }
+
+}

Modified: 
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
URL: 
http://svn.apache.org/viewvc/james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java?rev=1716960&r1=1716959&r2=1716960&view=diff
==============================================================================
--- 
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
 (original)
+++ 
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
 Sat Nov 28 13:04:34 2015
@@ -32,13 +32,13 @@ import org.apache.james.backends.cassand
 import 
org.apache.james.backends.cassandra.utils.LightweightTransactionException;
 import org.apache.james.mailbox.cassandra.CassandraId;
 import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry;
-import 
org.apache.james.mailbox.cassandra.mail.utils.SimpleMailboxACLJsonConverter;
 import org.apache.james.mailbox.cassandra.table.CassandraACLTable;
 import org.apache.james.mailbox.cassandra.table.CassandraMailboxTable;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.exception.UnsupportedRightException;
 import org.apache.james.mailbox.model.MailboxACL;
 import org.apache.james.mailbox.model.SimpleMailboxACL;
+import org.apache.james.mailbox.store.json.SimpleMailboxACLJsonConverter;
 import org.apache.james.mailbox.store.mail.model.Mailbox;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

Added: 
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraRegistrationModule.java
URL: 
http://svn.apache.org/viewvc/james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraRegistrationModule.java?rev=1716960&view=auto
==============================================================================
--- 
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraRegistrationModule.java
 (added)
+++ 
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraRegistrationModule.java
 Sat Nov 28 13:04:34 2015
@@ -0,0 +1,74 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.modules;
+
+import com.datastax.driver.core.schemabuilder.SchemaBuilder;
+import org.apache.james.backends.cassandra.components.CassandraIndex;
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.cassandra.components.CassandraTable;
+import org.apache.james.backends.cassandra.components.CassandraType;
+import org.apache.james.mailbox.cassandra.table.CassandraACLTable;
+import 
org.apache.james.mailbox.cassandra.table.CassandraMailboxPathRegisterTable;
+
+import java.util.Collections;
+import java.util.List;
+
+import static com.datastax.driver.core.DataType.bigint;
+import static com.datastax.driver.core.DataType.text;
+import static com.datastax.driver.core.DataType.timeuuid;
+
+public class CassandraRegistrationModule implements CassandraModule {
+
+    private final List<CassandraTable> tables;
+    private final List<CassandraIndex> index;
+    private final List<CassandraType> types;
+
+    public CassandraRegistrationModule() {
+        tables = Collections.singletonList(
+            new CassandraTable(CassandraMailboxPathRegisterTable.TABLE_NAME,
+                
SchemaBuilder.createTable(CassandraMailboxPathRegisterTable.TABLE_NAME)
+                    .ifNotExists()
+                    
.addUDTPartitionKey(CassandraMailboxPathRegisterTable.MAILBOX_PATH, 
SchemaBuilder.frozen(CassandraMailboxPathRegisterTable.MAILBOX_PATH))
+                    
.addClusteringColumn(CassandraMailboxPathRegisterTable.TOPIC, text())));
+        index = Collections.emptyList();
+        types = Collections.singletonList(
+            new CassandraType(CassandraMailboxPathRegisterTable.MAILBOX_PATH,
+                
SchemaBuilder.createType(CassandraMailboxPathRegisterTable.MAILBOX_PATH)
+                    .ifNotExists()
+                    
.addColumn(CassandraMailboxPathRegisterTable.MailboxPath.NAMESPACE, text())
+                    
.addColumn(CassandraMailboxPathRegisterTable.MailboxPath.NAME, text())
+                    
.addColumn(CassandraMailboxPathRegisterTable.MailboxPath.USER, text())));
+    }
+
+    @Override
+    public List<CassandraTable> moduleTables() {
+        return tables;
+    }
+
+    @Override
+    public List<CassandraIndex> moduleIndex() {
+        return index;
+    }
+
+    @Override
+    public List<CassandraType> moduleTypes() {
+        return types;
+    }
+}

Added: 
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMailboxPathRegisterTable.java
URL: 
http://svn.apache.org/viewvc/james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMailboxPathRegisterTable.java?rev=1716960&view=auto
==============================================================================
--- 
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMailboxPathRegisterTable.java
 (added)
+++ 
james/project/trunk/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMailboxPathRegisterTable.java
 Sat Nov 28 13:04:34 2015
@@ -0,0 +1,41 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.table;
+
+/**
+ * Created by benwa on 05/10/15.
+ * <p>
+ * Project under the Apache v 2 license
+ */
+public interface CassandraMailboxPathRegisterTable {
+
+    String TABLE_NAME = "mailboxPathRegister";
+
+    String MAILBOX_PATH = "mailboxPath";
+
+    String TOPIC = "topic";
+
+    interface MailboxPath {
+        String NAMESPACE = "namespace";
+        String USER = "user";
+        String NAME = "name";
+    }
+
+}

Added: 
james/project/trunk/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegistrerMapperTest.java
URL: 
http://svn.apache.org/viewvc/james/project/trunk/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegistrerMapperTest.java?rev=1716960&view=auto
==============================================================================
--- 
james/project/trunk/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegistrerMapperTest.java
 (added)
+++ 
james/project/trunk/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/event/distributed/CassandraMailboxPathRegistrerMapperTest.java
 Sat Nov 28 13:04:34 2015
@@ -0,0 +1,106 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.event.distributed;
+
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.mailbox.cassandra.modules.CassandraRegistrationModule;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.store.publisher.Topic;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class CassandraMailboxPathRegistrerMapperTest {
+
+    private static final CassandraCluster cassandra = 
CassandraCluster.create(new CassandraRegistrationModule());
+    private static final MailboxPath MAILBOX_PATH = new 
MailboxPath("namespace", "user", "name");
+    private static final MailboxPath MAILBOX_PATH_2 = new 
MailboxPath("namespace2", "user2", "name2");
+    private static final Topic TOPIC = new Topic("topic");
+    private static final int CASSANDRA_TIME_OUT_IN_S = 1;
+    private static final int CASSANDRA_TIME_OUT_IN_MS = 1000 * 
CASSANDRA_TIME_OUT_IN_S;
+    private static final Topic TOPIC_2 = new Topic("topic2");
+
+    private CassandraMailboxPathRegisterMapper mapper;
+
+    @Before
+    public void setUp() {
+        mapper = new CassandraMailboxPathRegisterMapper(cassandra.getConf(), 
cassandra.getTypesProvider(), CASSANDRA_TIME_OUT_IN_S);
+    }
+
+    @After
+    public void tearDown() {
+        cassandra.clearAllTables();
+    }
+
+    @Test
+    public void getTopicsShouldReturnEmptyResultByDefault() {
+        assertThat(mapper.getTopics(MAILBOX_PATH)).isEmpty();
+    }
+
+    @Test
+    public void doRegisterShouldWork() {
+        mapper.doRegister(MAILBOX_PATH, TOPIC);
+        assertThat(mapper.getTopics(MAILBOX_PATH)).containsOnly(TOPIC);
+    }
+
+    @Test
+    public void doRegisterShouldBeMailboxPathSpecific() {
+        mapper.doRegister(MAILBOX_PATH, TOPIC);
+        assertThat(mapper.getTopics(MAILBOX_PATH_2)).isEmpty();
+    }
+
+    @Test
+    public void doRegisterShouldAllowMultipleTopics() {
+        mapper.doRegister(MAILBOX_PATH, TOPIC);
+        mapper.doRegister(MAILBOX_PATH, TOPIC_2);
+        assertThat(mapper.getTopics(MAILBOX_PATH)).containsOnly(TOPIC, 
TOPIC_2);
+    }
+
+    @Test
+    public void doUnRegisterShouldWork() {
+        mapper.doRegister(MAILBOX_PATH, TOPIC);
+        mapper.doUnRegister(MAILBOX_PATH, TOPIC);
+        assertThat(mapper.getTopics(MAILBOX_PATH)).isEmpty();
+    }
+
+    @Test
+    public void doUnregisterShouldBeMailboxSpecific() {
+        mapper.doRegister(MAILBOX_PATH, TOPIC);
+        mapper.doUnRegister(MAILBOX_PATH_2, TOPIC);
+        assertThat(mapper.getTopics(MAILBOX_PATH)).containsOnly(TOPIC);
+    }
+
+    @Test
+    public void doUnregisterShouldBeTopicSpecific() {
+        mapper.doRegister(MAILBOX_PATH, TOPIC);
+        mapper.doUnRegister(MAILBOX_PATH, TOPIC_2);
+        assertThat(mapper.getTopics(MAILBOX_PATH)).containsOnly(TOPIC);
+    }
+
+    @Test
+    public void entriesShouldExpire() throws Exception {
+        mapper.doRegister(MAILBOX_PATH, TOPIC);
+        Thread.sleep(2 * CASSANDRA_TIME_OUT_IN_MS);
+        assertThat(mapper.getTopics(MAILBOX_PATH)).isEmpty();
+    }
+
+}

Added: 
james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegister.java
URL: 
http://svn.apache.org/viewvc/james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegister.java?rev=1716960&view=auto
==============================================================================
--- 
james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegister.java
 (added)
+++ 
james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegister.java
 Sat Nov 28 13:04:34 2015
@@ -0,0 +1,195 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.store.event.distributed;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.store.publisher.Topic;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class DistantMailboxPathRegister implements MailboxPathRegister {
+    private static final int DEFAULT_MAX_RETRY = 1000;
+    private final ConcurrentHashMap<MailboxPath, Long> 
registeredMailboxPathCount;
+    private final DistantMailboxPathRegisterMapper mapper;
+    private final Topic topic;
+    private final Timer timer;
+    private final int maxRetry;
+    private final long schedulerPeriodInS;
+
+    public DistantMailboxPathRegister(DistantMailboxPathRegisterMapper mapper, 
long schedulerPeriodInS) {
+        this(mapper, DEFAULT_MAX_RETRY, schedulerPeriodInS);
+    }
+
+    public DistantMailboxPathRegister(DistantMailboxPathRegisterMapper mapper, 
int maxRetry, long schedulerPeriodInS) {
+        this.maxRetry = maxRetry;
+        this.mapper = mapper;
+        this.registeredMailboxPathCount = new ConcurrentHashMap<MailboxPath, 
Long>();
+        this.topic = new Topic(UUID.randomUUID().toString());
+        this.timer = new Timer();
+        this.schedulerPeriodInS = schedulerPeriodInS;
+    }
+
+    @PostConstruct
+    public void init() {
+        timer.schedule(new TimerTask() {
+            @Override
+            public void run() {
+                Set<Map.Entry<MailboxPath, Long>> snapshot = 
ImmutableSet.copyOf(registeredMailboxPathCount.entrySet());
+                for(Map.Entry<MailboxPath, Long> entry : snapshot) {
+                    if (entry.getValue() > 0) {
+                        mapper.doRegister(entry.getKey(), topic);
+                    }
+                }
+            }
+        }, 0L, schedulerPeriodInS * 1000);
+    }
+
+    @PreDestroy
+    public void destroy() {
+        timer.cancel();
+        timer.purge();
+    }
+
+    @Override
+    public Set<Topic> getTopics(MailboxPath mailboxPath) {
+        return mapper.getTopics(mailboxPath);
+    }
+
+    @Override
+    public Topic getLocalTopic() {
+        return topic;
+    }
+
+    @Override
+    public void register(MailboxPath path) throws MailboxException {
+        int count = 0;
+        boolean success = false;
+        while (count < maxRetry && !success) {
+            count ++;
+            success = tryRegister(path);
+        }
+        if (!success) {
+            throw new MailboxException(maxRetry + " reached while trying to 
register " + path);
+        }
+    }
+
+    @Override
+    public void unregister(MailboxPath path) throws MailboxException {
+        int count = 0;
+        boolean success = false;
+        while (count < maxRetry && !success) {
+            count ++;
+            success = tryUnregister(path);
+        }
+        if (!success) {
+            throw new MailboxException(maxRetry + " reached while trying to 
unregister " + path);
+        }
+    }
+
+    @Override
+    public void doCompleteUnRegister(MailboxPath mailboxPath) {
+        registeredMailboxPathCount.remove(mailboxPath);
+        mapper.doUnRegister(mailboxPath, topic);
+    }
+
+    @Override
+    public void doRename(MailboxPath oldPath, MailboxPath newPath) throws 
MailboxException {
+        try {
+            int count = 0;
+            boolean success = false;
+            while (count < maxRetry && !success) {
+                success = tryCountTransfer(oldPath, newPath);
+            }
+            if (!success) {
+                throw new MailboxException(maxRetry + " reached while trying 
to rename " + oldPath + " in " + newPath);
+            }
+        } finally {
+            doCompleteUnRegister(oldPath);
+        }
+    }
+
+    private boolean tryCountTransfer(MailboxPath oldPath, MailboxPath newPath) 
throws MailboxException {
+        Long oldEntry = registeredMailboxPathCount.get(oldPath);
+        if (oldEntry == null) {
+            throw new MailboxException("Renamed entry does not exists");
+        }
+        Long entry = registeredMailboxPathCount.get(newPath);
+        if (entry != null) {
+            return registeredMailboxPathCount.replace(newPath, entry, oldEntry 
+ entry);
+        } else {
+            if (registeredMailboxPathCount.putIfAbsent(newPath, oldEntry) == 
null) {
+                mapper.doRegister(newPath, topic);
+                return true;
+            }
+            return false;
+        }
+    }
+
+    private boolean tryRegister(MailboxPath path) {
+        Long entry = registeredMailboxPathCount.get(path);
+        Long newEntry = entry;
+        if (newEntry == null) {
+            newEntry = 0L;
+        }
+        newEntry++;
+        if (entry != null) {
+            return registeredMailboxPathCount.replace(path, entry, newEntry);
+        } else {
+            if (registeredMailboxPathCount.putIfAbsent(path, newEntry) == 
null) {
+                mapper.doRegister(path, topic);
+                return true;
+            }
+            return false;
+        }
+    }
+
+    private boolean tryUnregister(MailboxPath path) throws MailboxException {
+        Long entry = registeredMailboxPathCount.get(path);
+        Long newEntry = entry;
+        if (newEntry == null) {
+            throw new MailboxException("Removing a non registered 
mailboxPath");
+        }
+        newEntry--;
+        if (newEntry != 0) {
+            return registeredMailboxPathCount.replace(path, entry, newEntry);
+        } else {
+            if (registeredMailboxPathCount.remove(path, entry)) {
+                mapper.doUnRegister(path, topic);
+                return true;
+            }
+            return false;
+        }
+    }
+
+    @VisibleForTesting
+    ConcurrentHashMap<MailboxPath, Long> getRegisteredMailboxPathCount() {
+        return registeredMailboxPathCount;
+    }
+}

Added: 
james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegisterMapper.java
URL: 
http://svn.apache.org/viewvc/james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegisterMapper.java?rev=1716960&view=auto
==============================================================================
--- 
james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegisterMapper.java
 (added)
+++ 
james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegisterMapper.java
 Sat Nov 28 13:04:34 2015
@@ -0,0 +1,35 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.store.event.distributed;
+
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.store.publisher.Topic;
+
+import java.util.Set;
+
+public interface DistantMailboxPathRegisterMapper {
+
+    Set<Topic> getTopics(MailboxPath mailboxPath);
+
+    void doRegister(MailboxPath mailboxPath, Topic topic);
+
+    void doUnRegister(MailboxPath mailboxPath, Topic topic);
+
+}

Added: 
james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/MailboxPathRegister.java
URL: 
http://svn.apache.org/viewvc/james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/MailboxPathRegister.java?rev=1716960&view=auto
==============================================================================
--- 
james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/MailboxPathRegister.java
 (added)
+++ 
james/project/trunk/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/distributed/MailboxPathRegister.java
 Sat Nov 28 13:04:34 2015
@@ -0,0 +1,58 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.store.event.distributed;
+
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.store.publisher.Topic;
+
+import java.util.Set;
+
+/**
+ * The TopicDispatcher allow you to :
+ *
+ *  - know to which queues you will need to send an event
+ *  - get the topic this James instance will be pooling
+ */
+public interface MailboxPathRegister {
+
+    /**
+     * Given a MailboxPath, we want to get the different topics we need to 
send the event to.
+     *
+     * @param mailboxPath MailboxPath
+     * @return List of topics concerned by this MailboxPath
+     */
+    Set<Topic> getTopics(MailboxPath mailboxPath);
+
+    /**
+     * Get the topic this James instance will consume
+     *
+     * @return The topic this James instance will consume
+     */
+    Topic getLocalTopic();
+
+    void register(MailboxPath path) throws MailboxException;
+
+    void unregister(MailboxPath path) throws MailboxException;
+
+    void doCompleteUnRegister(MailboxPath mailboxPath);
+
+    void doRename(MailboxPath oldPath, MailboxPath newPath) throws 
MailboxException;
+}
\ No newline at end of file

Added: 
james/project/trunk/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegisterTest.java
URL: 
http://svn.apache.org/viewvc/james/project/trunk/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegisterTest.java?rev=1716960&view=auto
==============================================================================
--- 
james/project/trunk/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegisterTest.java
 (added)
+++ 
james/project/trunk/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/distributed/DistantMailboxPathRegisterTest.java
 Sat Nov 28 13:04:34 2015
@@ -0,0 +1,364 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.store.event.distributed;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import static org.assertj.core.api.Assertions.fail;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Sets;
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+public class DistantMailboxPathRegisterTest {
+
+    private static final MailboxPath MAILBOX_PATH = new 
MailboxPath("namespace", "user", "name");
+    private static final MailboxPath NEW_MAILBOX_PATH = new 
MailboxPath("namespace_new", "user_new", "name_new");
+    private static final String TOPIC = "topic";
+
+    private DistantMailboxPathRegisterMapper mockedMapper;
+    private DistantMailboxPathRegister register;
+
+    @Before
+    public void setUp() {
+        mockedMapper = mock(DistantMailboxPathRegisterMapper.class);
+        register = new DistantMailboxPathRegister(mockedMapper, 1);
+    }
+
+    @Test(expected = MailboxException.class)
+    public void doRenameShouldThrowIfTryingToRenameNonExistingPath() throws 
Exception {
+        register.doRename(MAILBOX_PATH, NEW_MAILBOX_PATH);
+        verifyNoMoreInteractions(mockedMapper);
+    }
+
+    @Test
+    public void getTopicsShouldWork() {
+        final HashSet<String> result = Sets.newHashSet(TOPIC);
+        when(mockedMapper.getTopics(MAILBOX_PATH)).thenAnswer(new 
Answer<Set<String>>() {
+            @Override
+            public Set<String> answer(InvocationOnMock invocationOnMock) 
throws Throwable {
+                return result;
+            }
+        });
+        assertThat(register.getTopics(MAILBOX_PATH)).isEqualTo(result);
+    }
+
+    @Test
+    public void registerShouldWork() throws MailboxException {
+        register.register(MAILBOX_PATH);
+        verify(mockedMapper).doRegister(MAILBOX_PATH, 
register.getLocalTopic());
+        verifyNoMoreInteractions(mockedMapper);
+    }
+
+    @Test
+    public void registerShouldCallMapperOnce() throws MailboxException {
+        register.register(MAILBOX_PATH);
+        verify(mockedMapper).doRegister(MAILBOX_PATH, 
register.getLocalTopic());
+        register.register(MAILBOX_PATH);
+        verifyNoMoreInteractions(mockedMapper);
+    }
+
+    @Test
+    public void unregisterShouldWork() throws MailboxException {
+        register.register(MAILBOX_PATH);
+        verify(mockedMapper).doRegister(MAILBOX_PATH, 
register.getLocalTopic());
+        register.unregister(MAILBOX_PATH);
+        verify(mockedMapper).doUnRegister(MAILBOX_PATH, 
register.getLocalTopic());
+        verifyNoMoreInteractions(mockedMapper);
+    }
+
+
+    @Test
+    public void unregisterShouldNotCallMapperIfListenersAreStillPresent() 
throws MailboxException {
+        register.register(MAILBOX_PATH);
+        verify(mockedMapper).doRegister(MAILBOX_PATH, 
register.getLocalTopic());
+        register.register(MAILBOX_PATH);
+        register.unregister(MAILBOX_PATH);
+        verifyNoMoreInteractions(mockedMapper);
+        verifyNoMoreInteractions(mockedMapper);
+    }
+
+    @Test
+    public void unregisterShouldWorkWhenMultipleListenersWereRegistered() 
throws MailboxException {
+        register.register(MAILBOX_PATH);
+        verify(mockedMapper).doRegister(MAILBOX_PATH, 
register.getLocalTopic());
+        register.register(MAILBOX_PATH);
+        register.unregister(MAILBOX_PATH);
+        verifyNoMoreInteractions(mockedMapper);
+        register.unregister(MAILBOX_PATH);
+        verify(mockedMapper).doUnRegister(MAILBOX_PATH, 
register.getLocalTopic());
+        verifyNoMoreInteractions(mockedMapper);
+    }
+
+    @Test
+    public void doRenameShouldWork() throws Exception {
+        register.register(MAILBOX_PATH);
+        verify(mockedMapper).doRegister(MAILBOX_PATH, 
register.getLocalTopic());
+        register.doRename(MAILBOX_PATH, NEW_MAILBOX_PATH);
+        verify(mockedMapper).doRegister(NEW_MAILBOX_PATH, 
register.getLocalTopic());
+        verify(mockedMapper).doUnRegister(MAILBOX_PATH, 
register.getLocalTopic());
+        
assertThat(register.getRegisteredMailboxPathCount()).containsEntry(NEW_MAILBOX_PATH,
 1L);
+        verifyNoMoreInteractions(mockedMapper);
+    }
+
+    @Test
+    public void doRenameShouldWorkWhenEntryAlreadyExists() throws Exception {
+        register.register(MAILBOX_PATH);
+        verify(mockedMapper).doRegister(MAILBOX_PATH, 
register.getLocalTopic());
+        register.register(NEW_MAILBOX_PATH);
+        verify(mockedMapper).doRegister(NEW_MAILBOX_PATH, 
register.getLocalTopic());
+        register.doRename(MAILBOX_PATH, NEW_MAILBOX_PATH);
+        verify(mockedMapper).doUnRegister(MAILBOX_PATH, 
register.getLocalTopic());
+        
assertThat(register.getRegisteredMailboxPathCount()).containsEntry(NEW_MAILBOX_PATH,
 2L);
+        verifyNoMoreInteractions(mockedMapper);
+    }
+
+    @Test
+    public void mapShouldBeEmptyInitially() {
+        assertThat(register.getRegisteredMailboxPathCount()).isEmpty();
+    }
+
+    @Test
+    public void mapShouldContainOneListenerOnPathAfterRegister() throws 
MailboxException {
+        register.register(MAILBOX_PATH);
+        
assertThat(register.getRegisteredMailboxPathCount()).containsEntry(MAILBOX_PATH,
 1L);
+    }
+
+    @Test
+    public void mapShouldContainTwoListenerOnPathAfterTwoRegister() throws 
MailboxException {
+        register.register(MAILBOX_PATH);
+        register.register(MAILBOX_PATH);
+        
assertThat(register.getRegisteredMailboxPathCount()).containsEntry(MAILBOX_PATH,
 2L);
+    }
+
+    @Test
+    public void mapListenerCountShouldBeOkAfterTwoRegisterAndOneUnregister() 
throws MailboxException {
+        register.register(MAILBOX_PATH);
+        register.register(MAILBOX_PATH);
+        register.unregister(MAILBOX_PATH);
+        
assertThat(register.getRegisteredMailboxPathCount()).containsEntry(MAILBOX_PATH,
 1L);
+    }
+
+    @Test
+    public void 
mapListenerCountShouldBeEmptyAfterTwoRegisterAndOneUnregister() throws 
MailboxException {
+        register.register(MAILBOX_PATH);
+        register.unregister(MAILBOX_PATH);
+        assertThat(register.getRegisteredMailboxPathCount()).isEmpty();
+    }
+
+    @Test
+    public void mapListenerCountShouldBeEmptyAfterDoCompleteUnregister() 
throws MailboxException {
+        register.register(MAILBOX_PATH);
+        register.doCompleteUnRegister(MAILBOX_PATH);
+        assertThat(register.getRegisteredMailboxPathCount()).isEmpty();
+    }
+
+    @Test
+    public void mapListenerCountShouldHandleRename() throws Exception {
+        register.register(MAILBOX_PATH);
+        register.doRename(MAILBOX_PATH, NEW_MAILBOX_PATH);
+        
assertThat(register.getRegisteredMailboxPathCount()).containsEntry(NEW_MAILBOX_PATH,
 1L);
+    }
+
+    @Test
+    public void mapListenerCountShouldHandleRenameWhenEntryAlreadyExists() 
throws Exception {
+        register.register(MAILBOX_PATH);
+        register.register(NEW_MAILBOX_PATH);
+        register.doRename(MAILBOX_PATH, NEW_MAILBOX_PATH);
+        
assertThat(register.getRegisteredMailboxPathCount()).containsEntry(NEW_MAILBOX_PATH,
 2L);
+    }
+
+    @Test
+    public void registerShouldNotBeAffectedByMapperError() throws 
MailboxException {
+        doThrow(new 
RuntimeException()).when(mockedMapper).doRegister(MAILBOX_PATH, 
register.getLocalTopic());
+        try {
+            register.register(MAILBOX_PATH);
+            fail("Register should have thrown");
+        } catch (RuntimeException e) {
+            
assertThat(register.getRegisteredMailboxPathCount()).containsEntry(MAILBOX_PATH,
 1L);
+        }
+    }
+
+    @Test
+    public void unregisterShouldNotBeAffectedByMapperErrors() throws 
MailboxException {
+        register.register(MAILBOX_PATH);
+        doThrow(new 
RuntimeException()).when(mockedMapper).doUnRegister(MAILBOX_PATH, 
register.getLocalTopic());
+        try {
+            register.unregister(MAILBOX_PATH);
+            fail("Register should have thrown");
+        } catch (RuntimeException e) {
+            assertThat(register.getRegisteredMailboxPathCount()).isEmpty();
+        }
+    }
+
+    @Test
+    public void renameShouldNotBeAffectedByMapperErrors() throws 
MailboxException {
+        register.register(MAILBOX_PATH);
+        doThrow(new 
RuntimeException()).when(mockedMapper).doRegister(NEW_MAILBOX_PATH, 
register.getLocalTopic());
+        try {
+            register.doRename(MAILBOX_PATH, NEW_MAILBOX_PATH);
+            fail("Register should have thrown");
+        } catch (RuntimeException e) {
+            
assertThat(register.getRegisteredMailboxPathCount()).containsEntry(NEW_MAILBOX_PATH,
 1L)
+                .doesNotContainKey(MAILBOX_PATH);
+        }
+    }
+
+    @Test
+    public void completeUnregisterShouldNotBeAffectedByMapperErrors() throws 
MailboxException {
+        register.register(MAILBOX_PATH);
+        doThrow(new 
RuntimeException()).when(mockedMapper).doUnRegister(MAILBOX_PATH, 
register.getLocalTopic());
+        try {
+            register.doCompleteUnRegister(MAILBOX_PATH);
+            fail("Register should have thrown");
+        } catch (RuntimeException e) {
+            assertThat(register.getRegisteredMailboxPathCount()).isEmpty();
+        }
+    }
+
+    @Test
+    public void registerShouldWorkInAConcurrentEnvironment() throws Exception {
+        int numTask = 2;
+        final long increments = 100;
+        ExecutorService executorService = 
Executors.newFixedThreadPool(numTask);
+        for (int i = 0; i < numTask; i++) {
+            executorService.submit(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        int j = 0;
+                        while (j < increments) {
+                            register.register(MAILBOX_PATH);
+                            j++;
+                        }
+                    } catch (Exception e) {
+                        fail("Exception caught in thread", e);
+                    }
+                }
+            });
+        }
+        executorService.shutdown();
+        executorService.awaitTermination(10, TimeUnit.SECONDS);
+        
assertThat(register.getRegisteredMailboxPathCount()).containsEntry(MAILBOX_PATH,
 numTask * increments);
+    }
+
+    @Test
+    public void unregisterShouldWorkInAConcurrentEnvironment() throws 
Exception {
+        int numTask = 2;
+        final long increments = 100;
+        for (int i = 0; i < numTask * increments; i++) {
+            register.register(MAILBOX_PATH);
+        }
+        ExecutorService executorService = 
Executors.newFixedThreadPool(numTask);
+        for (int i = 0; i < numTask; i++) {
+            executorService.submit(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        int j = 0;
+                        while (j < increments) {
+                            register.unregister(MAILBOX_PATH);
+                            j++;
+                        }
+                    } catch (Exception e) {
+                        fail("Exception caught in thread", e);
+                    }
+                }
+            });
+        }
+        executorService.shutdown();
+        executorService.awaitTermination(10, TimeUnit.SECONDS);
+        assertThat(register.getRegisteredMailboxPathCount()).isEmpty();
+    }
+
+    @Test
+    public void 
unregisterMixedWithRegisterShouldWorkInAConcurrentEnvironment() throws 
Exception {
+        int numTask = 2;
+        final long increments = 100;
+        for (int i = 0; i < increments; i++) {
+            register.register(MAILBOX_PATH);
+        }
+        ExecutorService executorService = Executors.newFixedThreadPool(2* 
numTask);
+        for (int i = 0; i < numTask; i++) {
+            executorService.submit(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        int j = 0;
+                        while (j < increments) {
+                            register.register(MAILBOX_PATH);
+                            j++;
+                        }
+                    } catch (Exception e) {
+                        fail("Exception caught in thread", e);
+                    }
+                }
+            });
+            executorService.submit(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        int j = 0;
+                        while (j < increments) {
+                            register.unregister(MAILBOX_PATH);
+                            j++;
+                        }
+                    } catch (Exception e) {
+                        fail("Exception caught in thread", e);
+                    }
+                }
+            });
+        }
+        executorService.shutdown();
+        executorService.awaitTermination(10, TimeUnit.SECONDS);
+        
assertThat(register.getRegisteredMailboxPathCount()).containsEntry(MAILBOX_PATH,
 increments);
+    }
+
+    @Test
+    public void schedulerShouldWork() throws Exception {
+        register.register(MAILBOX_PATH);
+        try {
+            register.init();
+            Thread.sleep(1050);
+
+        } finally {
+            register.destroy();
+        }
+        verify(mockedMapper, times(3)).doRegister(MAILBOX_PATH, 
register.getLocalTopic());
+        verifyNoMoreInteractions(mockedMapper);
+    }
+
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to