JAMES-1693 Add an in memory implementation for mail queues and use it with 
guice in memory implementation


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/f89a2abd
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/f89a2abd
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/f89a2abd

Branch: refs/heads/master
Commit: f89a2abd2d135b070e5ba0297c8f8685de0cbe2f
Parents: f13f173
Author: Benoit Tellier <btell...@linagora.com>
Authored: Wed Mar 9 14:49:35 2016 +0700
Committer: Matthieu Baechler <matthieu.baech...@linagora.com>
Committed: Wed Mar 23 17:07:13 2016 +0100

----------------------------------------------------------------------
 .../org/apache/james/MemoryJamesServerMain.java |   4 +-
 .../modules/server/MemoryMailQueueFactory.java  | 130 +++++++++++++++++++
 .../modules/server/MemoryMailQueueModule.java   |  47 +++++++
 .../server/MemoryMailQueueFactoryTest.java      | 122 +++++++++++++++++
 4 files changed, 301 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/f89a2abd/server/container/guice/memory-guice/src/main/java/org/apache/james/MemoryJamesServerMain.java
----------------------------------------------------------------------
diff --git 
a/server/container/guice/memory-guice/src/main/java/org/apache/james/MemoryJamesServerMain.java
 
b/server/container/guice/memory-guice/src/main/java/org/apache/james/MemoryJamesServerMain.java
index 991ba1e..b3dffbc 100644
--- 
a/server/container/guice/memory-guice/src/main/java/org/apache/james/MemoryJamesServerMain.java
+++ 
b/server/container/guice/memory-guice/src/main/java/org/apache/james/MemoryJamesServerMain.java
@@ -22,8 +22,8 @@ package org.apache.james;
 import org.apache.james.mailbox.inmemory.InMemoryId;
 import org.apache.james.modules.data.MemoryDataModule;
 import org.apache.james.modules.mailbox.MemoryMailboxModule;
-import org.apache.james.modules.server.ActiveMQQueueModule;
 import org.apache.james.modules.server.JMXServerModule;
+import org.apache.james.modules.server.MemoryMailQueueModule;
 import org.apache.james.modules.server.QuotaModule;
 
 import com.google.inject.Module;
@@ -38,7 +38,7 @@ public class MemoryJamesServerMain {
         new MemoryDataModule(),
         new MemoryMailboxModule(),
         new QuotaModule(),
-        new ActiveMQQueueModule<>(inMemoryId));
+        new MemoryMailQueueModule<>(inMemoryId));
 
     public static void main(String[] args) throws Exception {
         new GuiceJamesServer<>(inMemoryId)

http://git-wip-us.apache.org/repos/asf/james-project/blob/f89a2abd/server/container/guice/memory-guice/src/main/java/org/apache/james/modules/server/MemoryMailQueueFactory.java
----------------------------------------------------------------------
diff --git 
a/server/container/guice/memory-guice/src/main/java/org/apache/james/modules/server/MemoryMailQueueFactory.java
 
b/server/container/guice/memory-guice/src/main/java/org/apache/james/modules/server/MemoryMailQueueFactory.java
new file mode 100644
index 0000000..39846ed
--- /dev/null
+++ 
b/server/container/guice/memory-guice/src/main/java/org/apache/james/modules/server/MemoryMailQueueFactory.java
@@ -0,0 +1,130 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.modules.server;
+
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.james.queue.api.MailQueue;
+import org.apache.james.queue.api.MailQueueFactory;
+import org.apache.james.queue.api.MailQueueItemDecoratorFactory;
+import org.apache.mailet.Mail;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Throwables;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+public class MemoryMailQueueFactory implements MailQueueFactory {
+
+    private final ConcurrentHashMap<String, MailQueue> mailQueues;
+    private final MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory;
+
+    @Inject
+    public MemoryMailQueueFactory(MailQueueItemDecoratorFactory 
mailQueueItemDecoratorFactory) {
+        this.mailQueues = new ConcurrentHashMap<>();
+        this.mailQueueItemDecoratorFactory = mailQueueItemDecoratorFactory;
+    }
+
+    @Override
+    public MailQueue getQueue(String name) {
+        return Optional.ofNullable(mailQueues.get(name))
+            .orElseGet(() -> tryInsertNewMailQueue(name));
+    }
+
+    private MailQueue tryInsertNewMailQueue(String name) {
+        MailQueue newMailQueue = new MemoryMailQueue(name, 
mailQueueItemDecoratorFactory);
+        return Optional.ofNullable(mailQueues.putIfAbsent(name, newMailQueue))
+            .orElse(newMailQueue);
+    }
+
+    public static class MemoryMailQueue implements MailQueue {
+
+        private final LinkedBlockingDeque<MemoryMailQueueItem> mailItems;
+        private final MailQueueItemDecoratorFactory 
mailQueueItemDecoratorFactory;
+        private final String name;
+
+        public MemoryMailQueue(String name,MailQueueItemDecoratorFactory 
mailQueueItemDecoratorFactory) {
+            this.mailItems = new LinkedBlockingDeque<>();
+            this.name = name;
+            this.mailQueueItemDecoratorFactory = mailQueueItemDecoratorFactory;
+        }
+
+        @Override
+        public void enQueue(Mail mail, long delay, TimeUnit unit) throws 
MailQueueException {
+            enQueue(mail);
+        }
+
+        @Override
+        public void enQueue(Mail mail) throws MailQueueException {
+            mailItems.addFirst(new MemoryMailQueueItem(mail));
+        }
+
+        @Override
+        public MailQueueItem deQueue() throws MailQueueException {
+            while (true) {
+                try {
+                    MemoryMailQueueItem item = mailItems.take();
+                    return mailQueueItemDecoratorFactory.decorate(item);
+                } catch (InterruptedException e) {
+                    Throwables.propagate(e);
+                }
+            }
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            MemoryMailQueue that = (MemoryMailQueue) o;
+
+            return Objects.equal(this.name, that.name);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(name);
+        }
+    }
+
+    public static class MemoryMailQueueItem implements MailQueue.MailQueueItem 
{
+
+        private final Mail mail;
+
+        public MemoryMailQueueItem(Mail mail) {
+            this.mail = mail;
+        }
+
+        @Override
+        public Mail getMail() {
+            return mail;
+        }
+
+        @Override
+        public void done(boolean success) throws MailQueue.MailQueueException {
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/f89a2abd/server/container/guice/memory-guice/src/main/java/org/apache/james/modules/server/MemoryMailQueueModule.java
----------------------------------------------------------------------
diff --git 
a/server/container/guice/memory-guice/src/main/java/org/apache/james/modules/server/MemoryMailQueueModule.java
 
b/server/container/guice/memory-guice/src/main/java/org/apache/james/modules/server/MemoryMailQueueModule.java
new file mode 100644
index 0000000..c41f101
--- /dev/null
+++ 
b/server/container/guice/memory-guice/src/main/java/org/apache/james/modules/server/MemoryMailQueueModule.java
@@ -0,0 +1,47 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.modules.server;
+
+import org.apache.james.jmap.send.PostDequeueDecoratorFactory;
+import org.apache.james.mailbox.store.mail.model.MailboxId;
+import org.apache.james.queue.api.MailQueueFactory;
+import org.apache.james.queue.api.MailQueueItemDecoratorFactory;
+import org.apache.james.utils.GuiceGenericType;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Singleton;
+import com.google.inject.TypeLiteral;
+
+public class MemoryMailQueueModule<Id extends MailboxId> extends 
AbstractModule {
+
+    private final GuiceGenericType<Id> guiceGenericType;
+
+    public MemoryMailQueueModule(TypeLiteral<Id> type) {
+        guiceGenericType = new GuiceGenericType<>(type);
+    }
+    
+    @Override
+    protected void configure() {
+        bind(MailQueueFactory.class).to(MemoryMailQueueFactory.class);
+        bind(MailQueueItemDecoratorFactory.class)
+            
.to(guiceGenericType.newGenericType(PostDequeueDecoratorFactory.class))
+            .in(Singleton.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/f89a2abd/server/container/guice/memory-guice/src/test/java/org/apache/james/server/MemoryMailQueueFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/server/container/guice/memory-guice/src/test/java/org/apache/james/server/MemoryMailQueueFactoryTest.java
 
b/server/container/guice/memory-guice/src/test/java/org/apache/james/server/MemoryMailQueueFactoryTest.java
new file mode 100644
index 0000000..560378f
--- /dev/null
+++ 
b/server/container/guice/memory-guice/src/test/java/org/apache/james/server/MemoryMailQueueFactoryTest.java
@@ -0,0 +1,122 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.server;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.james.modules.server.MemoryMailQueueFactory;
+import org.apache.james.queue.api.MailQueue;
+import org.apache.james.queue.api.MailQueue.MailQueueItem;
+import org.apache.james.queue.api.MailQueueItemDecoratorFactory;
+import org.apache.mailet.Mail;
+import org.apache.mailet.base.test.FakeMail;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Throwables;
+
+public class MemoryMailQueueFactoryTest {
+
+    public static final String KEY = "key";
+    public static final String BIS = "bis";
+
+    private MemoryMailQueueFactory memoryMailQueueFactory;
+    private ExecutorService executorService;
+
+    @Before
+    public void setUp() {
+        memoryMailQueueFactory = new MemoryMailQueueFactory(mailQueueItem -> 
new MailQueueItemDecoratorFactory.MailQueueItemDecorator(mailQueueItem) {
+            @Override
+            public Mail getMail() {
+                return mailQueueItem.getMail();
+            }
+
+            @Override
+            public void done(boolean success) throws 
MailQueue.MailQueueException {
+
+            }
+        });
+        executorService = Executors.newFixedThreadPool(2);
+    }
+
+    @After
+    public void tearDown() {
+        executorService.shutdownNow();
+    }
+
+    @Test
+    public void getQueueShouldNotReturnNull() {
+        assertThat(memoryMailQueueFactory.getQueue(KEY)).isNotNull();
+    }
+
+    @Test
+    public void 
getQueueShouldReturnTwoTimeTheSameResultWhenUsedWithTheSameKey() {
+        
assertThat(memoryMailQueueFactory.getQueue(KEY)).isEqualTo(memoryMailQueueFactory.getQueue(KEY));
+    }
+
+    @Test
+    public void getQueueShouldNotReturnTheSameQueueForTwoDifferentNames() {
+        
assertThat(memoryMailQueueFactory.getQueue(KEY)).isNotEqualTo(memoryMailQueueFactory.getQueue(BIS));
+    }
+
+    @Test
+    public void dequeueShouldWork() throws Exception{
+        Mail mail = new FakeMail();
+        memoryMailQueueFactory.getQueue(KEY).enQueue(mail);
+        
assertThat(memoryMailQueueFactory.getQueue(KEY).deQueue().getMail()).isEqualTo(mail);
+    }
+
+    @Test
+    public void dequeueShouldWorkWithMultipleMessages() throws Exception{
+        Mail mail1 = new FakeMail();
+        Mail mail2 = new FakeMail();
+        memoryMailQueueFactory.getQueue(KEY).enQueue(mail1);
+        memoryMailQueueFactory.getQueue(KEY).enQueue(mail2);
+        
assertThat(memoryMailQueueFactory.getQueue(KEY).deQueue().getMail()).isEqualTo(mail2);
+        
assertThat(memoryMailQueueFactory.getQueue(KEY).deQueue().getMail()).isEqualTo(mail1);
+    }
+
+    @Test(timeout = 20000)
+    public void deQueueShouldWaitForAMailToBeEnqueued() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        Mail mail = new FakeMail();
+        executorService.submit(() -> {
+            try {
+                latch.await();
+                memoryMailQueueFactory.getQueue(KEY).enQueue(mail);
+            } catch (Exception e) {
+                throw Throwables.propagate(e);
+            }
+        });
+        Future<MailQueueItem> tryDequeue = executorService.submit(() -> 
memoryMailQueueFactory.getQueue(KEY).deQueue());
+        assertThatThrownBy(() -> tryDequeue.get(100, 
TimeUnit.MILLISECONDS)).isInstanceOf(TimeoutException.class);
+        latch.countDown();
+        assertThat(tryDequeue.get().getMail()).isEqualTo(mail);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to