JAMES-2544 Temporary fix RabbitMQ concurrency issues using a global lock We lower the amount of mail sent to avoid overwhelming Cassandra
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/7ed58e9e Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/7ed58e9e Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/7ed58e9e Branch: refs/heads/master Commit: 7ed58e9e737e290e3033e3706156ba5fc5d18bfa Parents: 6bfbc8e Author: Benoit Tellier <btell...@linagora.com> Authored: Thu Oct 4 15:49:01 2018 +0700 Committer: Benoit Tellier <btell...@linagora.com> Committed: Fri Oct 5 09:42:10 2018 +0700 ---------------------------------------------------------------------- .../backend/rabbitmq/RabbitChannelPool.java | 130 ------------------- .../backend/rabbitmq/RabbitChannelPoolImpl.java | 115 ++++++++++++++++ .../backend/rabbitmq/RabbitMQChannelPool.java | 47 +++++++ .../backend/rabbitmq/RabbitMQHealthCheck.java | 8 +- .../backend/rabbitmq/SimpleChannelPool.java | 42 ++++++ .../backend/rabbitmq/RabbitMQExtension.java | 14 +- .../rabbitmq/RabbitMQHealthCheckTest.java | 2 +- .../james/queue/api/MailQueueContract.java | 8 +- .../james/queue/rabbitmq/RabbitClient.java | 12 +- .../queue/rabbitmq/RabbitMQMailQueueTest.java | 18 +-- 10 files changed, 228 insertions(+), 168 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/7ed58e9e/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java deleted file mode 100644 index 66ea5b9..0000000 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java +++ /dev/null @@ -1,130 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.backend.rabbitmq; - -import java.util.function.Supplier; - -import javax.annotation.PreDestroy; -import javax.inject.Inject; - -import org.apache.commons.pool2.BasePooledObjectFactory; -import org.apache.commons.pool2.ObjectPool; -import org.apache.commons.pool2.PooledObject; -import org.apache.commons.pool2.impl.DefaultPooledObject; -import org.apache.commons.pool2.impl.GenericObjectPool; -import org.apache.james.util.MemoizedSupplier; - -import com.github.fge.lambdas.Throwing; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; - -public class RabbitChannelPool { - - public static class ConnectionFailedException extends RuntimeException { - public ConnectionFailedException(Throwable cause) { - super(cause); - } - } - - private static class ChannelBasePooledObjectFactory extends BasePooledObjectFactory<Channel> { - private final Supplier<Connection> connection; - - public ChannelBasePooledObjectFactory(RabbitMQConnectionFactory factory) { - this.connection = MemoizedSupplier.of( - Throwing.supplier(() -> factory.create()).sneakyThrow()); - } - - @Override - public Channel create() throws Exception { - return connection.get() - .createChannel(); - } - - @Override - public PooledObject<Channel> wrap(Channel obj) { - return new DefaultPooledObject<>(obj); - } - - @Override - public void destroyObject(PooledObject<Channel> pooledObject) throws Exception { - Channel channel = pooledObject.getObject(); - channel.close(); - } - } - - @FunctionalInterface - public interface RabbitFunction<T, E extends Throwable> { - T execute(Channel channel) throws E; - } - - @FunctionalInterface - public interface RabbitConsumer<E extends Throwable> { - void execute(Channel channel) throws E; - } - - private final ObjectPool<Channel> pool; - - @Inject - public RabbitChannelPool(RabbitMQConnectionFactory factory) { - pool = new GenericObjectPool<>( - new ChannelBasePooledObjectFactory(factory)); - } - - public <T, E extends Throwable> T execute(RabbitFunction<T, E> f) throws E, ConnectionFailedException { - Channel channel = borrowChannel(); - try { - return f.execute(channel); - } finally { - returnChannel(channel); - } - } - - - public <E extends Throwable> void execute(RabbitConsumer<E> f) throws E, ConnectionFailedException { - Channel channel = borrowChannel(); - try { - f.execute(channel); - } finally { - returnChannel(channel); - } - } - - @PreDestroy - public void close() { - pool.close(); - } - - private Channel borrowChannel() { - try { - return pool.borrowObject(); - } catch (Exception e) { - throw new ConnectionFailedException(e); - } - } - - private void returnChannel(Channel channel) { - try { - pool.returnObject(channel); - } catch (Exception ignore) { - //ignore when return is failing - } - } - -} http://git-wip-us.apache.org/repos/asf/james-project/blob/7ed58e9e/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java new file mode 100644 index 0000000..fcfdcba --- /dev/null +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java @@ -0,0 +1,115 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backend.rabbitmq; + +import java.util.function.Supplier; + +import javax.annotation.PreDestroy; +import javax.inject.Inject; + +import org.apache.commons.pool2.BasePooledObjectFactory; +import org.apache.commons.pool2.ObjectPool; +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.impl.DefaultPooledObject; +import org.apache.commons.pool2.impl.GenericObjectPool; +import org.apache.james.util.MemoizedSupplier; + +import com.github.fge.lambdas.Throwing; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; + +public class RabbitChannelPoolImpl implements RabbitMQChannelPool { + + private static class ChannelBasePooledObjectFactory extends BasePooledObjectFactory<Channel> { + private final Supplier<Connection> connection; + + public ChannelBasePooledObjectFactory(RabbitMQConnectionFactory factory) { + this.connection = MemoizedSupplier.of( + Throwing.supplier(() -> factory.create()).sneakyThrow()); + } + + @Override + public Channel create() throws Exception { + return connection.get() + .createChannel(); + } + + @Override + public PooledObject<Channel> wrap(Channel obj) { + return new DefaultPooledObject<>(obj); + } + + @Override + public void destroyObject(PooledObject<Channel> pooledObject) throws Exception { + Channel channel = pooledObject.getObject(); + channel.close(); + } + } + + private final ObjectPool<Channel> pool; + + @Inject + public RabbitChannelPoolImpl(RabbitMQConnectionFactory factory) { + pool = new GenericObjectPool<>( + new ChannelBasePooledObjectFactory(factory)); + } + + @Override + public <T, E extends Throwable> T execute(RabbitFunction<T, E> f) throws E, ConnectionFailedException { + Channel channel = borrowChannel(); + try { + return f.execute(channel); + } finally { + returnChannel(channel); + } + } + + @Override + public <E extends Throwable> void execute(RabbitConsumer<E> f) throws E, ConnectionFailedException { + Channel channel = borrowChannel(); + try { + f.execute(channel); + } finally { + returnChannel(channel); + } + } + + @PreDestroy + public void close() { + pool.close(); + } + + private Channel borrowChannel() { + try { + return pool.borrowObject(); + } catch (Exception e) { + throw new ConnectionFailedException(e); + } + } + + private void returnChannel(Channel channel) { + try { + pool.returnObject(channel); + } catch (Exception ignore) { + //ignore when return is failing + } + } + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/7ed58e9e/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java new file mode 100644 index 0000000..44b666d --- /dev/null +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java @@ -0,0 +1,47 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backend.rabbitmq; + +import com.rabbitmq.client.Channel; + +public interface RabbitMQChannelPool { + class ConnectionFailedException extends RuntimeException { + public ConnectionFailedException(Throwable cause) { + super(cause); + } + } + + @FunctionalInterface + interface RabbitFunction<T, E extends Throwable> { + T execute(Channel channel) throws E; + } + + @FunctionalInterface + interface RabbitConsumer<E extends Throwable> { + void execute(Channel channel) throws E; + } + + <T, E extends Throwable> T execute(RabbitFunction<T, E> f) + throws E, ConnectionFailedException; + + + <E extends Throwable> void execute(RabbitConsumer<E> f) + throws E, ConnectionFailedException; +} http://git-wip-us.apache.org/repos/asf/james-project/blob/7ed58e9e/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java index fd9f757..17a7ead 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheck.java @@ -35,11 +35,11 @@ public class RabbitMQHealthCheck implements HealthCheck { private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQHealthCheck.class); private static final ComponentName COMPONENT_NAME = new ComponentName("RabbitMQ backend"); - private final RabbitChannelPool rabbitChannelPool; + private final RabbitMQChannelPool rabbitChannelPoolImpl; @Inject - public RabbitMQHealthCheck(RabbitChannelPool rabbitChannelPool) throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException { - this.rabbitChannelPool = rabbitChannelPool; + public RabbitMQHealthCheck(RabbitMQChannelPool rabbitChannelPoolImpl) throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException { + this.rabbitChannelPoolImpl = rabbitChannelPoolImpl; } @Override @@ -50,7 +50,7 @@ public class RabbitMQHealthCheck implements HealthCheck { @Override public Result check() { try { - return rabbitChannelPool.execute(channel -> { + return rabbitChannelPoolImpl.execute(channel -> { if (channel.isOpen()) { return Result.healthy(COMPONENT_NAME); } http://git-wip-us.apache.org/repos/asf/james-project/blob/7ed58e9e/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java new file mode 100644 index 0000000..8381aac --- /dev/null +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java @@ -0,0 +1,42 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backend.rabbitmq; + +import java.io.IOException; + +import com.rabbitmq.client.Channel; + +public class SimpleChannelPool implements RabbitMQChannelPool { + private final Channel channel; + + public SimpleChannelPool(RabbitMQConnectionFactory factory) throws IOException { + this.channel = factory.create().createChannel(); + } + + @Override + public synchronized <T, E extends Throwable> T execute(RabbitFunction<T, E> f) throws E, ConnectionFailedException { + return f.execute(channel); + } + + @Override + public synchronized <E extends Throwable> void execute(RabbitConsumer<E> f) throws E, ConnectionFailedException { + f.execute(channel); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/7ed58e9e/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java index 5326ffb..c8762e7 100644 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java @@ -37,7 +37,7 @@ import com.nurkiewicz.asyncretry.AsyncRetryExecutor; public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback, AfterAllCallback, AfterEachCallback, ParameterResolver { private DockerRabbitMQ rabbitMQ; - private RabbitChannelPool rabbitChannelPool; + private RabbitChannelPoolImpl rabbitChannelPoolImpl; @Override public void beforeAll(ExtensionContext context) { @@ -47,12 +47,12 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback, @Override public void beforeEach(ExtensionContext extensionContext) throws Exception { - rabbitChannelPool = createRabbitChannelPool(); + rabbitChannelPoolImpl = createRabbitChannelPool(); } @Override public void afterEach(ExtensionContext context) throws Exception { - rabbitChannelPool.close(); + rabbitChannelPoolImpl.close(); rabbitMQ.reset(); } @@ -71,15 +71,15 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback, return rabbitMQ; } - public RabbitChannelPool getRabbitChannelPool() { - return rabbitChannelPool; + public RabbitChannelPoolImpl getRabbitChannelPool() { + return rabbitChannelPoolImpl; } public DockerRabbitMQ getRabbitMQ() { return rabbitMQ; } - private RabbitChannelPool createRabbitChannelPool() throws URISyntaxException { + private RabbitChannelPoolImpl createRabbitChannelPool() throws URISyntaxException { RabbitMQConfiguration rabbitMQConfiguration = RabbitMQConfiguration.builder() .amqpUri(rabbitMQ.amqpUri()) .managementUri(rabbitMQ.managementUri()) @@ -89,6 +89,6 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback, RabbitMQConnectionFactory rabbitMQConnectionFactory = new RabbitMQConnectionFactory( rabbitMQConfiguration, new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor())); - return new RabbitChannelPool(rabbitMQConnectionFactory); + return new RabbitChannelPoolImpl(rabbitMQConnectionFactory); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/7ed58e9e/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java index bc25ebc..b88ef8f 100644 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java @@ -48,7 +48,7 @@ class RabbitMQHealthCheckTest { new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor())); healthCheck = new RabbitMQHealthCheck( - new RabbitChannelPool(rabbitMQConnectionFactory)); + new RabbitChannelPoolImpl(rabbitMQConnectionFactory)); } @Test http://git-wip-us.apache.org/repos/asf/james-project/blob/7ed58e9e/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java index 8c6418b..b54983a 100644 --- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java +++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java @@ -353,8 +353,8 @@ public interface MailQueueContract { ConcurrentLinkedDeque<Mail> dequeuedMails = new ConcurrentLinkedDeque<>(); int threadCount = 10; - int operationCount = 100; - int totalDequeuedMessages = 500; + int operationCount = 10; + int totalDequeuedMessages = 50; ConcurrentTestRunner.builder() .operation((threadNumber, step) -> { if (step % 2 == 0) { @@ -385,8 +385,8 @@ public interface MailQueueContract { ConcurrentLinkedDeque<Mail> dequeuedMails = new ConcurrentLinkedDeque<>(); int threadCount = 10; - int operationCount = 150; - int totalDequeuedMessages = 500; + int operationCount = 15; + int totalDequeuedMessages = 50; ConcurrentTestRunner.builder() .operation((threadNumber, step) -> { if (step % 3 == 0) { http://git-wip-us.apache.org/repos/asf/james-project/blob/7ed58e9e/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java index 8757e33..9263b1f 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java @@ -22,7 +22,7 @@ package org.apache.james.queue.rabbitmq; import java.io.IOException; import java.util.Optional; -import org.apache.james.backend.rabbitmq.RabbitChannelPool; +import org.apache.james.backend.rabbitmq.RabbitMQChannelPool; import org.apache.james.queue.api.MailQueue; import com.google.common.collect.ImmutableMap; @@ -40,9 +40,9 @@ class RabbitClient { private static final String ROUTING_KEY = ""; public static final boolean REQUEUE = true; - private final RabbitChannelPool channelPool; + private final RabbitMQChannelPool channelPool; - RabbitClient(RabbitChannelPool channelPool) { + RabbitClient(RabbitMQChannelPool channelPool) { this.channelPool = channelPool; } @@ -69,17 +69,17 @@ class RabbitClient { } void ack(long deliveryTag) throws IOException { - RabbitChannelPool.RabbitConsumer<IOException> consumer = channel -> channel.basicAck(deliveryTag, !MULTIPLE); + RabbitMQChannelPool.RabbitConsumer<IOException> consumer = channel -> channel.basicAck(deliveryTag, !MULTIPLE); channelPool.execute(consumer); } void nack(long deliveryTag) throws IOException { - RabbitChannelPool.RabbitConsumer<IOException> consumer = channel -> channel.basicNack(deliveryTag, !MULTIPLE, REQUEUE); + RabbitMQChannelPool.RabbitConsumer<IOException> consumer = channel -> channel.basicNack(deliveryTag, !MULTIPLE, REQUEUE); channelPool.execute(consumer); } Optional<GetResponse> poll(MailQueueName name) throws IOException { - RabbitChannelPool.RabbitFunction<Optional<GetResponse>, IOException> f = channel -> + RabbitMQChannelPool.RabbitFunction<Optional<GetResponse>, IOException> f = channel -> Optional.ofNullable(channel.basicGet(name.toWorkQueueName().asString(), !AUTO_ACK)); return channelPool.execute(f); } http://git-wip-us.apache.org/repos/asf/james-project/blob/7ed58e9e/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index ee008a9..37507f5 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -35,10 +35,10 @@ import java.util.stream.Stream; import javax.mail.internet.MimeMessage; import org.apache.james.backend.rabbitmq.DockerRabbitMQ; -import org.apache.james.backend.rabbitmq.RabbitChannelPool; import org.apache.james.backend.rabbitmq.RabbitMQConfiguration; import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory; import org.apache.james.backend.rabbitmq.RabbitMQExtension; +import org.apache.james.backend.rabbitmq.SimpleChannelPool; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraClusterExtension; import org.apache.james.backends.cassandra.components.CassandraModule; @@ -128,7 +128,7 @@ public class RabbitMQMailQueueTest implements ManageableMailQueueContract, MailQ RabbitMQConnectionFactory rabbitMQConnectionFactory = new RabbitMQConnectionFactory(rabbitMQConfiguration, new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor())); - RabbitClient rabbitClient = new RabbitClient(new RabbitChannelPool(rabbitMQConnectionFactory)); + RabbitClient rabbitClient = new RabbitClient(new SimpleChannelPool(rabbitMQConnectionFactory)); RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory( metricTestSystem.getSpyMetricFactory(), metricTestSystem.getSpyGaugeRegistry(), @@ -229,20 +229,6 @@ public class RabbitMQMailQueueTest implements ManageableMailQueueContract, MailQ public void constructorShouldRegisterGetQueueSizeGauge(MailQueueMetricExtension.MailQueueMetricTestSystem testSystem) { } - @Test - @Override - @Disabled("JAMES-2544 acknowledgement need to be done on the original channel, which is not permitted by the channelPool") - public void concurrentEnqueueDequeueShouldNotFail() { - - } - - @Test - @Override - @Disabled("JAMES-2544 acknowledgement need to be done on the original channel, which is not permitted by the channelPool") - public void concurrentEnqueueDequeueWithAckNackShouldNotFail() { - } - - private void enqueueSomeMails(Function<Integer, String> namePattern, int emailCount) { IntStream.rangeClosed(1, emailCount) .forEach(Throwing.intConsumer(i -> enQueue(defaultMail() --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org