JAMES-1693 Add an in memory implementation for mail queues and use it with guice in memory implementation
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/f89a2abd Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/f89a2abd Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/f89a2abd Branch: refs/heads/master Commit: f89a2abd2d135b070e5ba0297c8f8685de0cbe2f Parents: f13f173 Author: Benoit Tellier <btell...@linagora.com> Authored: Wed Mar 9 14:49:35 2016 +0700 Committer: Matthieu Baechler <matthieu.baech...@linagora.com> Committed: Wed Mar 23 17:07:13 2016 +0100 ---------------------------------------------------------------------- .../org/apache/james/MemoryJamesServerMain.java | 4 +- .../modules/server/MemoryMailQueueFactory.java | 130 +++++++++++++++++++ .../modules/server/MemoryMailQueueModule.java | 47 +++++++ .../server/MemoryMailQueueFactoryTest.java | 122 +++++++++++++++++ 4 files changed, 301 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/f89a2abd/server/container/guice/memory-guice/src/main/java/org/apache/james/MemoryJamesServerMain.java ---------------------------------------------------------------------- diff --git a/server/container/guice/memory-guice/src/main/java/org/apache/james/MemoryJamesServerMain.java b/server/container/guice/memory-guice/src/main/java/org/apache/james/MemoryJamesServerMain.java index 991ba1e..b3dffbc 100644 --- a/server/container/guice/memory-guice/src/main/java/org/apache/james/MemoryJamesServerMain.java +++ b/server/container/guice/memory-guice/src/main/java/org/apache/james/MemoryJamesServerMain.java @@ -22,8 +22,8 @@ package org.apache.james; import org.apache.james.mailbox.inmemory.InMemoryId; import org.apache.james.modules.data.MemoryDataModule; import org.apache.james.modules.mailbox.MemoryMailboxModule; -import org.apache.james.modules.server.ActiveMQQueueModule; import org.apache.james.modules.server.JMXServerModule; +import org.apache.james.modules.server.MemoryMailQueueModule; import org.apache.james.modules.server.QuotaModule; import com.google.inject.Module; @@ -38,7 +38,7 @@ public class MemoryJamesServerMain { new MemoryDataModule(), new MemoryMailboxModule(), new QuotaModule(), - new ActiveMQQueueModule<>(inMemoryId)); + new MemoryMailQueueModule<>(inMemoryId)); public static void main(String[] args) throws Exception { new GuiceJamesServer<>(inMemoryId) http://git-wip-us.apache.org/repos/asf/james-project/blob/f89a2abd/server/container/guice/memory-guice/src/main/java/org/apache/james/modules/server/MemoryMailQueueFactory.java ---------------------------------------------------------------------- diff --git a/server/container/guice/memory-guice/src/main/java/org/apache/james/modules/server/MemoryMailQueueFactory.java b/server/container/guice/memory-guice/src/main/java/org/apache/james/modules/server/MemoryMailQueueFactory.java new file mode 100644 index 0000000..39846ed --- /dev/null +++ b/server/container/guice/memory-guice/src/main/java/org/apache/james/modules/server/MemoryMailQueueFactory.java @@ -0,0 +1,130 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.modules.server; + +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; + +import org.apache.james.queue.api.MailQueue; +import org.apache.james.queue.api.MailQueueFactory; +import org.apache.james.queue.api.MailQueueItemDecoratorFactory; +import org.apache.mailet.Mail; + +import com.google.common.base.Objects; +import com.google.common.base.Throwables; +import com.google.inject.Inject; +import com.google.inject.Singleton; + +@Singleton +public class MemoryMailQueueFactory implements MailQueueFactory { + + private final ConcurrentHashMap<String, MailQueue> mailQueues; + private final MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory; + + @Inject + public MemoryMailQueueFactory(MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory) { + this.mailQueues = new ConcurrentHashMap<>(); + this.mailQueueItemDecoratorFactory = mailQueueItemDecoratorFactory; + } + + @Override + public MailQueue getQueue(String name) { + return Optional.ofNullable(mailQueues.get(name)) + .orElseGet(() -> tryInsertNewMailQueue(name)); + } + + private MailQueue tryInsertNewMailQueue(String name) { + MailQueue newMailQueue = new MemoryMailQueue(name, mailQueueItemDecoratorFactory); + return Optional.ofNullable(mailQueues.putIfAbsent(name, newMailQueue)) + .orElse(newMailQueue); + } + + public static class MemoryMailQueue implements MailQueue { + + private final LinkedBlockingDeque<MemoryMailQueueItem> mailItems; + private final MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory; + private final String name; + + public MemoryMailQueue(String name,MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory) { + this.mailItems = new LinkedBlockingDeque<>(); + this.name = name; + this.mailQueueItemDecoratorFactory = mailQueueItemDecoratorFactory; + } + + @Override + public void enQueue(Mail mail, long delay, TimeUnit unit) throws MailQueueException { + enQueue(mail); + } + + @Override + public void enQueue(Mail mail) throws MailQueueException { + mailItems.addFirst(new MemoryMailQueueItem(mail)); + } + + @Override + public MailQueueItem deQueue() throws MailQueueException { + while (true) { + try { + MemoryMailQueueItem item = mailItems.take(); + return mailQueueItemDecoratorFactory.decorate(item); + } catch (InterruptedException e) { + Throwables.propagate(e); + } + } + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + + MemoryMailQueue that = (MemoryMailQueue) o; + + return Objects.equal(this.name, that.name); + } + + @Override + public int hashCode() { + return Objects.hashCode(name); + } + } + + public static class MemoryMailQueueItem implements MailQueue.MailQueueItem { + + private final Mail mail; + + public MemoryMailQueueItem(Mail mail) { + this.mail = mail; + } + + @Override + public Mail getMail() { + return mail; + } + + @Override + public void done(boolean success) throws MailQueue.MailQueueException { + + } + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/f89a2abd/server/container/guice/memory-guice/src/main/java/org/apache/james/modules/server/MemoryMailQueueModule.java ---------------------------------------------------------------------- diff --git a/server/container/guice/memory-guice/src/main/java/org/apache/james/modules/server/MemoryMailQueueModule.java b/server/container/guice/memory-guice/src/main/java/org/apache/james/modules/server/MemoryMailQueueModule.java new file mode 100644 index 0000000..c41f101 --- /dev/null +++ b/server/container/guice/memory-guice/src/main/java/org/apache/james/modules/server/MemoryMailQueueModule.java @@ -0,0 +1,47 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.modules.server; + +import org.apache.james.jmap.send.PostDequeueDecoratorFactory; +import org.apache.james.mailbox.store.mail.model.MailboxId; +import org.apache.james.queue.api.MailQueueFactory; +import org.apache.james.queue.api.MailQueueItemDecoratorFactory; +import org.apache.james.utils.GuiceGenericType; + +import com.google.inject.AbstractModule; +import com.google.inject.Singleton; +import com.google.inject.TypeLiteral; + +public class MemoryMailQueueModule<Id extends MailboxId> extends AbstractModule { + + private final GuiceGenericType<Id> guiceGenericType; + + public MemoryMailQueueModule(TypeLiteral<Id> type) { + guiceGenericType = new GuiceGenericType<>(type); + } + + @Override + protected void configure() { + bind(MailQueueFactory.class).to(MemoryMailQueueFactory.class); + bind(MailQueueItemDecoratorFactory.class) + .to(guiceGenericType.newGenericType(PostDequeueDecoratorFactory.class)) + .in(Singleton.class); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/f89a2abd/server/container/guice/memory-guice/src/test/java/org/apache/james/server/MemoryMailQueueFactoryTest.java ---------------------------------------------------------------------- diff --git a/server/container/guice/memory-guice/src/test/java/org/apache/james/server/MemoryMailQueueFactoryTest.java b/server/container/guice/memory-guice/src/test/java/org/apache/james/server/MemoryMailQueueFactoryTest.java new file mode 100644 index 0000000..560378f --- /dev/null +++ b/server/container/guice/memory-guice/src/test/java/org/apache/james/server/MemoryMailQueueFactoryTest.java @@ -0,0 +1,122 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.server; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.james.modules.server.MemoryMailQueueFactory; +import org.apache.james.queue.api.MailQueue; +import org.apache.james.queue.api.MailQueue.MailQueueItem; +import org.apache.james.queue.api.MailQueueItemDecoratorFactory; +import org.apache.mailet.Mail; +import org.apache.mailet.base.test.FakeMail; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.base.Throwables; + +public class MemoryMailQueueFactoryTest { + + public static final String KEY = "key"; + public static final String BIS = "bis"; + + private MemoryMailQueueFactory memoryMailQueueFactory; + private ExecutorService executorService; + + @Before + public void setUp() { + memoryMailQueueFactory = new MemoryMailQueueFactory(mailQueueItem -> new MailQueueItemDecoratorFactory.MailQueueItemDecorator(mailQueueItem) { + @Override + public Mail getMail() { + return mailQueueItem.getMail(); + } + + @Override + public void done(boolean success) throws MailQueue.MailQueueException { + + } + }); + executorService = Executors.newFixedThreadPool(2); + } + + @After + public void tearDown() { + executorService.shutdownNow(); + } + + @Test + public void getQueueShouldNotReturnNull() { + assertThat(memoryMailQueueFactory.getQueue(KEY)).isNotNull(); + } + + @Test + public void getQueueShouldReturnTwoTimeTheSameResultWhenUsedWithTheSameKey() { + assertThat(memoryMailQueueFactory.getQueue(KEY)).isEqualTo(memoryMailQueueFactory.getQueue(KEY)); + } + + @Test + public void getQueueShouldNotReturnTheSameQueueForTwoDifferentNames() { + assertThat(memoryMailQueueFactory.getQueue(KEY)).isNotEqualTo(memoryMailQueueFactory.getQueue(BIS)); + } + + @Test + public void dequeueShouldWork() throws Exception{ + Mail mail = new FakeMail(); + memoryMailQueueFactory.getQueue(KEY).enQueue(mail); + assertThat(memoryMailQueueFactory.getQueue(KEY).deQueue().getMail()).isEqualTo(mail); + } + + @Test + public void dequeueShouldWorkWithMultipleMessages() throws Exception{ + Mail mail1 = new FakeMail(); + Mail mail2 = new FakeMail(); + memoryMailQueueFactory.getQueue(KEY).enQueue(mail1); + memoryMailQueueFactory.getQueue(KEY).enQueue(mail2); + assertThat(memoryMailQueueFactory.getQueue(KEY).deQueue().getMail()).isEqualTo(mail2); + assertThat(memoryMailQueueFactory.getQueue(KEY).deQueue().getMail()).isEqualTo(mail1); + } + + @Test(timeout = 20000) + public void deQueueShouldWaitForAMailToBeEnqueued() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + Mail mail = new FakeMail(); + executorService.submit(() -> { + try { + latch.await(); + memoryMailQueueFactory.getQueue(KEY).enQueue(mail); + } catch (Exception e) { + throw Throwables.propagate(e); + } + }); + Future<MailQueueItem> tryDequeue = executorService.submit(() -> memoryMailQueueFactory.getQueue(KEY).deQueue()); + assertThatThrownBy(() -> tryDequeue.get(100, TimeUnit.MILLISECONDS)).isInstanceOf(TimeoutException.class); + latch.countDown(); + assertThat(tryDequeue.get().getMail()).isEqualTo(mail); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org