JAMES-2544 Temporary fix RabbitMQ concurrency issues using a global lock

We lower the amount of mail sent to avoid overwhelming Cassandra


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/7ed58e9e
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/7ed58e9e
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/7ed58e9e

Branch: refs/heads/master
Commit: 7ed58e9e737e290e3033e3706156ba5fc5d18bfa
Parents: 6bfbc8e
Author: Benoit Tellier <btell...@linagora.com>
Authored: Thu Oct 4 15:49:01 2018 +0700
Committer: Benoit Tellier <btell...@linagora.com>
Committed: Fri Oct 5 09:42:10 2018 +0700

----------------------------------------------------------------------
 .../backend/rabbitmq/RabbitChannelPool.java     | 130 -------------------
 .../backend/rabbitmq/RabbitChannelPoolImpl.java | 115 ++++++++++++++++
 .../backend/rabbitmq/RabbitMQChannelPool.java   |  47 +++++++
 .../backend/rabbitmq/RabbitMQHealthCheck.java   |   8 +-
 .../backend/rabbitmq/SimpleChannelPool.java     |  42 ++++++
 .../backend/rabbitmq/RabbitMQExtension.java     |  14 +-
 .../rabbitmq/RabbitMQHealthCheckTest.java       |   2 +-
 .../james/queue/api/MailQueueContract.java      |   8 +-
 .../james/queue/rabbitmq/RabbitClient.java      |  12 +-
 .../queue/rabbitmq/RabbitMQMailQueueTest.java   |  18 +--
 10 files changed, 228 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/7ed58e9e/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java
----------------------------------------------------------------------
diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java
deleted file mode 100644
index 66ea5b9..0000000
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.backend.rabbitmq;
-
-import java.util.function.Supplier;
-
-import javax.annotation.PreDestroy;
-import javax.inject.Inject;
-
-import org.apache.commons.pool2.BasePooledObjectFactory;
-import org.apache.commons.pool2.ObjectPool;
-import org.apache.commons.pool2.PooledObject;
-import org.apache.commons.pool2.impl.DefaultPooledObject;
-import org.apache.commons.pool2.impl.GenericObjectPool;
-import org.apache.james.util.MemoizedSupplier;
-
-import com.github.fge.lambdas.Throwing;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-
-public class RabbitChannelPool {
-
-    public static class ConnectionFailedException extends RuntimeException {
-        public ConnectionFailedException(Throwable cause) {
-            super(cause);
-        }
-    }
-
-    private static class ChannelBasePooledObjectFactory extends 
BasePooledObjectFactory<Channel> {
-        private final Supplier<Connection> connection;
-
-        public ChannelBasePooledObjectFactory(RabbitMQConnectionFactory 
factory) {
-            this.connection = MemoizedSupplier.of(
-                    Throwing.supplier(() -> factory.create()).sneakyThrow());
-        }
-
-        @Override
-        public Channel create() throws Exception {
-            return connection.get()
-                    .createChannel();
-        }
-
-        @Override
-        public PooledObject<Channel> wrap(Channel obj) {
-            return new DefaultPooledObject<>(obj);
-        }
-
-        @Override
-        public void destroyObject(PooledObject<Channel> pooledObject) throws 
Exception {
-            Channel channel = pooledObject.getObject();
-            channel.close();
-        }
-    }
-
-    @FunctionalInterface
-    public interface RabbitFunction<T, E extends Throwable> {
-        T execute(Channel channel) throws E;
-    }
-
-    @FunctionalInterface
-    public interface RabbitConsumer<E extends Throwable> {
-        void execute(Channel channel) throws E;
-    }
-
-    private final ObjectPool<Channel> pool;
-
-    @Inject
-    public RabbitChannelPool(RabbitMQConnectionFactory factory) {
-        pool = new GenericObjectPool<>(
-            new ChannelBasePooledObjectFactory(factory));
-    }
-
-    public <T, E extends Throwable> T execute(RabbitFunction<T, E> f) throws 
E, ConnectionFailedException {
-        Channel channel = borrowChannel();
-        try {
-            return f.execute(channel);
-        } finally {
-            returnChannel(channel);
-        }
-    }
-
-
-    public <E extends Throwable> void execute(RabbitConsumer<E> f) throws E, 
ConnectionFailedException {
-        Channel channel = borrowChannel();
-        try {
-            f.execute(channel);
-        } finally {
-            returnChannel(channel);
-        }
-    }
-
-    @PreDestroy
-    public void close() {
-        pool.close();
-    }
-
-    private Channel borrowChannel() {
-        try {
-            return pool.borrowObject();
-        } catch (Exception e) {
-            throw new ConnectionFailedException(e);
-        }
-    }
-
-    private void returnChannel(Channel channel) {
-        try {
-            pool.returnObject(channel);
-        } catch (Exception ignore) {
-            //ignore when return is failing
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/james-project/blob/7ed58e9e/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java
----------------------------------------------------------------------
diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java
new file mode 100644
index 0000000..fcfdcba
--- /dev/null
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java
@@ -0,0 +1,115 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backend.rabbitmq;
+
+import java.util.function.Supplier;
+
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+
+import org.apache.commons.pool2.BasePooledObjectFactory;
+import org.apache.commons.pool2.ObjectPool;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.apache.james.util.MemoizedSupplier;
+
+import com.github.fge.lambdas.Throwing;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+
+public class RabbitChannelPoolImpl implements RabbitMQChannelPool {
+
+    private static class ChannelBasePooledObjectFactory extends 
BasePooledObjectFactory<Channel> {
+        private final Supplier<Connection> connection;
+
+        public ChannelBasePooledObjectFactory(RabbitMQConnectionFactory 
factory) {
+            this.connection = MemoizedSupplier.of(
+                    Throwing.supplier(() -> factory.create()).sneakyThrow());
+        }
+
+        @Override
+        public Channel create() throws Exception {
+            return connection.get()
+                    .createChannel();
+        }
+
+        @Override
+        public PooledObject<Channel> wrap(Channel obj) {
+            return new DefaultPooledObject<>(obj);
+        }
+
+        @Override
+        public void destroyObject(PooledObject<Channel> pooledObject) throws 
Exception {
+            Channel channel = pooledObject.getObject();
+            channel.close();
+        }
+    }
+
+    private final ObjectPool<Channel> pool;
+
+    @Inject
+    public RabbitChannelPoolImpl(RabbitMQConnectionFactory factory) {
+        pool = new GenericObjectPool<>(
+            new ChannelBasePooledObjectFactory(factory));
+    }
+
+    @Override
+    public <T, E extends Throwable> T execute(RabbitFunction<T, E> f) throws 
E, ConnectionFailedException {
+        Channel channel = borrowChannel();
+        try {
+            return f.execute(channel);
+        } finally {
+            returnChannel(channel);
+        }
+    }
+
+    @Override
+    public <E extends Throwable> void execute(RabbitConsumer<E> f) throws E, 
ConnectionFailedException {
+        Channel channel = borrowChannel();
+        try {
+            f.execute(channel);
+        } finally {
+            returnChannel(channel);
+        }
+    }
+
+    @PreDestroy
+    public void close() {
+        pool.close();
+    }
+
+    private Channel borrowChannel() {
+        try {
+            return pool.borrowObject();
+        } catch (Exception e) {
+            throw new ConnectionFailedException(e);
+        }
+    }
+
+    private void returnChannel(Channel channel) {
+        try {
+            pool.returnObject(channel);
+        } catch (Exception ignore) {
+            //ignore when return is failing
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/7ed58e9e/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java
----------------------------------------------------------------------
diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java
new file mode 100644
index 0000000..44b666d
--- /dev/null
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java
@@ -0,0 +1,47 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backend.rabbitmq;
+
+import com.rabbitmq.client.Channel;
+
+public interface RabbitMQChannelPool {
+    class ConnectionFailedException extends RuntimeException {
+        public ConnectionFailedException(Throwable cause) {
+            super(cause);
+        }
+    }
+
+    @FunctionalInterface
+    interface RabbitFunction<T, E extends Throwable> {
+        T execute(Channel channel) throws E;
+    }
+
+    @FunctionalInterface
+    interface RabbitConsumer<E extends Throwable> {
+        void execute(Channel channel) throws E;
+    }
+
+    <T, E extends Throwable> T execute(RabbitFunction<T, E> f)
+        throws E, ConnectionFailedException;
+
+
+    <E extends Throwable> void execute(RabbitConsumer<E> f)
+        throws E, ConnectionFailedException;
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/7ed58e9e/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java
----------------------------------------------------------------------
diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java
index fd9f757..17a7ead 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java
@@ -35,11 +35,11 @@ public class RabbitMQHealthCheck implements HealthCheck {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(RabbitMQHealthCheck.class);
     private static final ComponentName COMPONENT_NAME = new 
ComponentName("RabbitMQ backend");
 
-    private final RabbitChannelPool rabbitChannelPool;
+    private final RabbitMQChannelPool rabbitChannelPoolImpl;
 
     @Inject
-    public RabbitMQHealthCheck(RabbitChannelPool rabbitChannelPool) throws 
NoSuchAlgorithmException, KeyManagementException, URISyntaxException {
-        this.rabbitChannelPool = rabbitChannelPool;
+    public RabbitMQHealthCheck(RabbitMQChannelPool rabbitChannelPoolImpl) 
throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException {
+        this.rabbitChannelPoolImpl = rabbitChannelPoolImpl;
     }
 
     @Override
@@ -50,7 +50,7 @@ public class RabbitMQHealthCheck implements HealthCheck {
     @Override
     public Result check() {
         try {
-            return rabbitChannelPool.execute(channel -> {
+            return rabbitChannelPoolImpl.execute(channel -> {
                     if (channel.isOpen()) {
                         return Result.healthy(COMPONENT_NAME);
                     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/7ed58e9e/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java
----------------------------------------------------------------------
diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java
new file mode 100644
index 0000000..8381aac
--- /dev/null
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java
@@ -0,0 +1,42 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backend.rabbitmq;
+
+import java.io.IOException;
+
+import com.rabbitmq.client.Channel;
+
+public class SimpleChannelPool implements RabbitMQChannelPool {
+    private final Channel channel;
+
+    public SimpleChannelPool(RabbitMQConnectionFactory factory) throws 
IOException {
+        this.channel = factory.create().createChannel();
+    }
+
+    @Override
+    public synchronized  <T, E extends Throwable> T execute(RabbitFunction<T, 
E> f) throws E, ConnectionFailedException {
+        return f.execute(channel);
+    }
+
+    @Override
+    public synchronized  <E extends Throwable> void execute(RabbitConsumer<E> 
f) throws E, ConnectionFailedException {
+        f.execute(channel);
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/7ed58e9e/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java
----------------------------------------------------------------------
diff --git 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java
 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java
index 5326ffb..c8762e7 100644
--- 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java
+++ 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java
@@ -37,7 +37,7 @@ import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
 public class RabbitMQExtension implements BeforeAllCallback, 
BeforeEachCallback, AfterAllCallback, AfterEachCallback, ParameterResolver {
 
     private DockerRabbitMQ rabbitMQ;
-    private RabbitChannelPool rabbitChannelPool;
+    private RabbitChannelPoolImpl rabbitChannelPoolImpl;
 
     @Override
     public void beforeAll(ExtensionContext context) {
@@ -47,12 +47,12 @@ public class RabbitMQExtension implements 
BeforeAllCallback, BeforeEachCallback,
 
     @Override
     public void beforeEach(ExtensionContext extensionContext) throws Exception 
{
-        rabbitChannelPool = createRabbitChannelPool();
+        rabbitChannelPoolImpl = createRabbitChannelPool();
     }
 
     @Override
     public void afterEach(ExtensionContext context) throws Exception {
-        rabbitChannelPool.close();
+        rabbitChannelPoolImpl.close();
         rabbitMQ.reset();
     }
 
@@ -71,15 +71,15 @@ public class RabbitMQExtension implements 
BeforeAllCallback, BeforeEachCallback,
         return rabbitMQ;
     }
 
-    public RabbitChannelPool getRabbitChannelPool() {
-        return rabbitChannelPool;
+    public RabbitChannelPoolImpl getRabbitChannelPool() {
+        return rabbitChannelPoolImpl;
     }
 
     public DockerRabbitMQ getRabbitMQ() {
         return rabbitMQ;
     }
 
-    private RabbitChannelPool createRabbitChannelPool() throws 
URISyntaxException {
+    private RabbitChannelPoolImpl createRabbitChannelPool() throws 
URISyntaxException {
         RabbitMQConfiguration rabbitMQConfiguration = 
RabbitMQConfiguration.builder()
             .amqpUri(rabbitMQ.amqpUri())
             .managementUri(rabbitMQ.managementUri())
@@ -89,6 +89,6 @@ public class RabbitMQExtension implements BeforeAllCallback, 
BeforeEachCallback,
         RabbitMQConnectionFactory rabbitMQConnectionFactory = new 
RabbitMQConnectionFactory(
             rabbitMQConfiguration,
             new 
AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor()));
-        return new RabbitChannelPool(rabbitMQConnectionFactory);
+        return new RabbitChannelPoolImpl(rabbitMQConnectionFactory);
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/7ed58e9e/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java
----------------------------------------------------------------------
diff --git 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java
 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java
index bc25ebc..b88ef8f 100644
--- 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java
+++ 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java
@@ -48,7 +48,7 @@ class RabbitMQHealthCheckTest {
                 new 
AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor()));
 
         healthCheck = new RabbitMQHealthCheck(
-            new RabbitChannelPool(rabbitMQConnectionFactory));
+            new RabbitChannelPoolImpl(rabbitMQConnectionFactory));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/james-project/blob/7ed58e9e/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
 
b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
index 8c6418b..b54983a 100644
--- 
a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
+++ 
b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
@@ -353,8 +353,8 @@ public interface MailQueueContract {
         ConcurrentLinkedDeque<Mail> dequeuedMails = new 
ConcurrentLinkedDeque<>();
 
         int threadCount = 10;
-        int operationCount = 100;
-        int totalDequeuedMessages = 500;
+        int operationCount = 10;
+        int totalDequeuedMessages = 50;
         ConcurrentTestRunner.builder()
             .operation((threadNumber, step) -> {
                 if (step % 2 == 0) {
@@ -385,8 +385,8 @@ public interface MailQueueContract {
         ConcurrentLinkedDeque<Mail> dequeuedMails = new 
ConcurrentLinkedDeque<>();
 
         int threadCount = 10;
-        int operationCount = 150;
-        int totalDequeuedMessages = 500;
+        int operationCount = 15;
+        int totalDequeuedMessages = 50;
         ConcurrentTestRunner.builder()
             .operation((threadNumber, step) -> {
                 if (step % 3 == 0) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/7ed58e9e/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java
index 8757e33..9263b1f 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java
@@ -22,7 +22,7 @@ package org.apache.james.queue.rabbitmq;
 import java.io.IOException;
 import java.util.Optional;
 
-import org.apache.james.backend.rabbitmq.RabbitChannelPool;
+import org.apache.james.backend.rabbitmq.RabbitMQChannelPool;
 import org.apache.james.queue.api.MailQueue;
 
 import com.google.common.collect.ImmutableMap;
@@ -40,9 +40,9 @@ class RabbitClient {
     private static final String ROUTING_KEY = "";
     public static final boolean REQUEUE = true;
 
-    private final RabbitChannelPool channelPool;
+    private final RabbitMQChannelPool channelPool;
 
-    RabbitClient(RabbitChannelPool channelPool) {
+    RabbitClient(RabbitMQChannelPool channelPool) {
         this.channelPool = channelPool;
     }
 
@@ -69,17 +69,17 @@ class RabbitClient {
     }
 
     void ack(long deliveryTag) throws IOException {
-        RabbitChannelPool.RabbitConsumer<IOException> consumer = channel -> 
channel.basicAck(deliveryTag, !MULTIPLE);
+        RabbitMQChannelPool.RabbitConsumer<IOException> consumer = channel -> 
channel.basicAck(deliveryTag, !MULTIPLE);
         channelPool.execute(consumer);
     }
 
     void nack(long deliveryTag) throws IOException {
-        RabbitChannelPool.RabbitConsumer<IOException> consumer = channel -> 
channel.basicNack(deliveryTag, !MULTIPLE, REQUEUE);
+        RabbitMQChannelPool.RabbitConsumer<IOException> consumer = channel -> 
channel.basicNack(deliveryTag, !MULTIPLE, REQUEUE);
         channelPool.execute(consumer);
     }
 
     Optional<GetResponse> poll(MailQueueName name) throws IOException {
-        RabbitChannelPool.RabbitFunction<Optional<GetResponse>, IOException> f 
= channel ->
+        RabbitMQChannelPool.RabbitFunction<Optional<GetResponse>, IOException> 
f = channel ->
             
Optional.ofNullable(channel.basicGet(name.toWorkQueueName().asString(), 
!AUTO_ACK));
         return channelPool.execute(f);
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/7ed58e9e/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index ee008a9..37507f5 100644
--- 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -35,10 +35,10 @@ import java.util.stream.Stream;
 import javax.mail.internet.MimeMessage;
 
 import org.apache.james.backend.rabbitmq.DockerRabbitMQ;
-import org.apache.james.backend.rabbitmq.RabbitChannelPool;
 import org.apache.james.backend.rabbitmq.RabbitMQConfiguration;
 import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
 import org.apache.james.backend.rabbitmq.RabbitMQExtension;
+import org.apache.james.backend.rabbitmq.SimpleChannelPool;
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.components.CassandraModule;
@@ -128,7 +128,7 @@ public class RabbitMQMailQueueTest implements 
ManageableMailQueueContract, MailQ
         RabbitMQConnectionFactory rabbitMQConnectionFactory = new 
RabbitMQConnectionFactory(rabbitMQConfiguration,
                 new 
AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor()));
 
-        RabbitClient rabbitClient = new RabbitClient(new 
RabbitChannelPool(rabbitMQConnectionFactory));
+        RabbitClient rabbitClient = new RabbitClient(new 
SimpleChannelPool(rabbitMQConnectionFactory));
         RabbitMQMailQueueFactory.PrivateFactory factory = new 
RabbitMQMailQueueFactory.PrivateFactory(
             metricTestSystem.getSpyMetricFactory(),
             metricTestSystem.getSpyGaugeRegistry(),
@@ -229,20 +229,6 @@ public class RabbitMQMailQueueTest implements 
ManageableMailQueueContract, MailQ
     public void 
constructorShouldRegisterGetQueueSizeGauge(MailQueueMetricExtension.MailQueueMetricTestSystem
 testSystem) {
     }
 
-    @Test
-    @Override
-    @Disabled("JAMES-2544 acknowledgement need to be done on the original 
channel, which is not permitted by the channelPool")
-    public void concurrentEnqueueDequeueShouldNotFail() {
-
-    }
-
-    @Test
-    @Override
-    @Disabled("JAMES-2544 acknowledgement need to be done on the original 
channel, which is not permitted by the channelPool")
-    public void concurrentEnqueueDequeueWithAckNackShouldNotFail() {
-    }
-
-
     private void enqueueSomeMails(Function<Integer, String> namePattern, int 
emailCount) {
         IntStream.rangeClosed(1, emailCount)
             .forEach(Throwing.intConsumer(i -> enQueue(defaultMail()


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to