JAMES-2540 RabbitMQ Mail Queue should comply with the metric contract Note that as getSize() is not yet implemented the corresponding gauge is not yet registered...
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/fc034826 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/fc034826 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/fc034826 Branch: refs/heads/master Commit: fc034826eeae8b60d5674bbea26365f449fc70d6 Parents: 6e8d222 Author: Benoit Tellier <[email protected]> Authored: Tue Sep 11 09:51:00 2018 +0700 Committer: Benoit Tellier <[email protected]> Committed: Fri Sep 14 11:05:42 2018 +0700 ---------------------------------------------------------------------- .../apache/james/metrics/api/MetricFactory.java | 16 ++++++- .../james/metrics/api/NoopMetricFactory.java | 7 --- .../dropwizard/DropWizardMetricFactory.java | 12 ----- .../metrics/logger/DefaultMetricFactory.java | 12 ----- server/queue/queue-rabbitmq/pom.xml | 4 ++ .../james/queue/rabbitmq/RabbitMQMailQueue.java | 49 +++++++++++++++----- .../queue/rabbitmq/RabbitMQMailQueueTest.java | 28 ++++++++--- .../rabbitmq/RabbitMqMailQueueFactoryTest.java | 10 ++-- 8 files changed, 86 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/fc034826/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java ---------------------------------------------------------------------- diff --git a/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java b/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java index 10937b0..ca63d43 100644 --- a/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java +++ b/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java @@ -27,5 +27,19 @@ public interface MetricFactory { TimeMetric timer(String name); - <T> T withMetric(String name, Supplier<T> operation); + default <T> T withMetric(String name, Supplier<T> operation) { + TimeMetric timer = timer(name); + try { + return operation.get(); + } finally { + timer.stopAndPublish(); + } + } + + default void withMetric(String name, Runnable runnable) { + withMetric(name, () -> { + runnable.run(); + return null; + }); + } } http://git-wip-us.apache.org/repos/asf/james-project/blob/fc034826/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/NoopMetricFactory.java ---------------------------------------------------------------------- diff --git a/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/NoopMetricFactory.java b/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/NoopMetricFactory.java index f5d82ca..446af5a 100644 --- a/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/NoopMetricFactory.java +++ b/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/NoopMetricFactory.java @@ -18,8 +18,6 @@ ****************************************************************/ package org.apache.james.metrics.api; -import java.util.function.Supplier; - public class NoopMetricFactory implements MetricFactory { @Override @@ -65,9 +63,4 @@ public class NoopMetricFactory implements MetricFactory { } } - @Override - public <T> T withMetric(String name, Supplier<T> operation) { - return operation.get(); - } - } http://git-wip-us.apache.org/repos/asf/james-project/blob/fc034826/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactory.java ---------------------------------------------------------------------- diff --git a/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactory.java b/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactory.java index f2d5b11..500259b 100644 --- a/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactory.java +++ b/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactory.java @@ -19,8 +19,6 @@ package org.apache.james.metrics.dropwizard; -import java.util.function.Supplier; - import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.inject.Inject; @@ -54,16 +52,6 @@ public class DropWizardMetricFactory implements MetricFactory { return new DropWizardTimeMetric(name, metricRegistry.timer(name).time()); } - @Override - public <T> T withMetric(String name, Supplier<T> operation) { - TimeMetric timer = timer(name); - try { - return operation.get(); - } finally { - timer.stopAndPublish(); - } - } - @PostConstruct public void start() { jmxReporter.start(); http://git-wip-us.apache.org/repos/asf/james-project/blob/fc034826/metrics/metrics-logger/src/main/java/org/apache/james/metrics/logger/DefaultMetricFactory.java ---------------------------------------------------------------------- diff --git a/metrics/metrics-logger/src/main/java/org/apache/james/metrics/logger/DefaultMetricFactory.java b/metrics/metrics-logger/src/main/java/org/apache/james/metrics/logger/DefaultMetricFactory.java index 2997805..b00e3f3 100644 --- a/metrics/metrics-logger/src/main/java/org/apache/james/metrics/logger/DefaultMetricFactory.java +++ b/metrics/metrics-logger/src/main/java/org/apache/james/metrics/logger/DefaultMetricFactory.java @@ -18,8 +18,6 @@ ****************************************************************/ package org.apache.james.metrics.logger; -import java.util.function.Supplier; - import org.apache.james.metrics.api.Metric; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.metrics.api.TimeMetric; @@ -40,14 +38,4 @@ public class DefaultMetricFactory implements MetricFactory { return new DefaultTimeMetric(name); } - @Override - public <T> T withMetric(String name, Supplier<T> operation) { - TimeMetric timer = timer(name); - try { - return operation.get(); - } finally { - timer.stopAndPublish(); - } - } - } http://git-wip-us.apache.org/repos/asf/james-project/blob/fc034826/server/queue/queue-rabbitmq/pom.xml ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/pom.xml b/server/queue/queue-rabbitmq/pom.xml index 3a70738..2bc2a58 100644 --- a/server/queue/queue-rabbitmq/pom.xml +++ b/server/queue/queue-rabbitmq/pom.xml @@ -95,6 +95,10 @@ <scope>test</scope> </dependency> <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>metrics-api</artifactId> + </dependency> + <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/james-project/blob/fc034826/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java index f9ad07c..8f9b68e 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java @@ -37,6 +37,9 @@ import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.Store; import org.apache.james.blob.mail.MimeMessagePartsId; import org.apache.james.core.MailAddress; +import org.apache.james.metrics.api.GaugeRegistry; +import org.apache.james.metrics.api.Metric; +import org.apache.james.metrics.api.MetricFactory; import org.apache.james.queue.api.MailQueue; import org.apache.james.server.core.MailImpl; import org.apache.james.util.SerializationUtil; @@ -90,19 +93,24 @@ public class RabbitMQMailQueue implements MailQueue { } static class Factory { + private final MetricFactory metricFactory; + private final GaugeRegistry gaugeRegistry; private final RabbitClient rabbitClient; private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore; private final BlobId.Factory blobIdFactory; @Inject - @VisibleForTesting Factory(RabbitClient rabbitClient, Store<MimeMessage, MimeMessagePartsId> mimeMessageStore, BlobId.Factory blobIdFactory) { + @VisibleForTesting Factory(MetricFactory metricFactory, GaugeRegistry gaugeRegistry, RabbitClient rabbitClient, + Store<MimeMessage, MimeMessagePartsId> mimeMessageStore, BlobId.Factory blobIdFactory) { + this.metricFactory = metricFactory; + this.gaugeRegistry = gaugeRegistry; this.rabbitClient = rabbitClient; this.mimeMessageStore = mimeMessageStore; this.blobIdFactory = blobIdFactory; } RabbitMQMailQueue create(MailQueueName mailQueueName) { - return new RabbitMQMailQueue(mailQueueName, rabbitClient, mimeMessageStore, blobIdFactory); + return new RabbitMQMailQueue(metricFactory, gaugeRegistry, mailQueueName, rabbitClient, mimeMessageStore, blobIdFactory); } } @@ -113,8 +121,13 @@ public class RabbitMQMailQueue implements MailQueue { private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore; private final BlobId.Factory blobIdFactory; private final ObjectMapper objectMapper; + private final MetricFactory metricFactory; + private final Metric enqueueMetric; + private final Metric dequeueMetric; + private final GaugeRegistry gaugeRegistry; - RabbitMQMailQueue(MailQueueName name, RabbitClient rabbitClient, Store<MimeMessage, MimeMessagePartsId> mimeMessageStore, BlobId.Factory blobIdFactory) { + RabbitMQMailQueue(MetricFactory metricFactory, GaugeRegistry gaugeRegistry, MailQueueName name, RabbitClient rabbitClient, + Store<MimeMessage, MimeMessagePartsId> mimeMessageStore, BlobId.Factory blobIdFactory) { this.mimeMessageStore = mimeMessageStore; this.blobIdFactory = blobIdFactory; this.name = name; @@ -123,6 +136,11 @@ public class RabbitMQMailQueue implements MailQueue { .registerModule(new Jdk8Module()) .registerModule(new JavaTimeModule()) .registerModule(new GuavaModule()); + + this.metricFactory = metricFactory; + this.gaugeRegistry = gaugeRegistry; + this.enqueueMetric = metricFactory.generate(ENQUEUED_METRIC_NAME_PREFIX + name.asString()); + this.dequeueMetric = metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + name.asString()); } @Override @@ -140,10 +158,15 @@ public class RabbitMQMailQueue implements MailQueue { @Override public void enQueue(Mail mail) throws MailQueueException { - MimeMessagePartsId partsId = saveBlobs(mail).join(); - MailDTO mailDTO = MailDTO.fromMail(mail, partsId); - byte[] message = getMessageBytes(mailDTO); - rabbitClient.publish(name, message); + metricFactory.withMetric(ENQUEUED_TIMER_METRIC_NAME_PREFIX + name.asString(), + Throwing.runnable(() -> { + MimeMessagePartsId partsId = saveBlobs(mail).join(); + MailDTO mailDTO = MailDTO.fromMail(mail, partsId); + byte[] message = getMessageBytes(mailDTO); + rabbitClient.publish(name, message); + + enqueueMetric.increment(); + }).sneakyThrow()); } private CompletableFuture<MimeMessagePartsId> saveBlobs(Mail mail) throws MailQueueException { @@ -164,10 +187,14 @@ public class RabbitMQMailQueue implements MailQueue { @Override public MailQueueItem deQueue() throws MailQueueException { - GetResponse getResponse = pollChannel(); - MailDTO mailDTO = toDTO(getResponse); - Mail mail = toMail(mailDTO); - return new RabbitMQMailQueueItem(rabbitClient, getResponse.getEnvelope().getDeliveryTag(), mail); + return metricFactory.withMetric(DEQUEUED_TIMER_METRIC_NAME_PREFIX + name.asString(), + Throwing.supplier(() -> { + GetResponse getResponse = pollChannel(); + MailDTO mailDTO = toDTO(getResponse); + Mail mail = toMail(mailDTO); + dequeueMetric.increment(); + return new RabbitMQMailQueueItem(rabbitClient, getResponse.getEnvelope().getDeliveryTag(), mail); + }).sneakyThrow()); } private MailDTO toDTO(GetResponse getResponse) throws MailQueueException { http://git-wip-us.apache.org/repos/asf/james-project/blob/fc034826/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index 811c361..04e1b72 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -42,18 +42,23 @@ import org.apache.james.blob.cassandra.CassandraBlobModule; import org.apache.james.blob.cassandra.CassandraBlobsDAO; import org.apache.james.blob.mail.MimeMessagePartsId; import org.apache.james.blob.mail.MimeMessageStore; +import org.apache.james.metrics.api.NoopGaugeRegistry; +import org.apache.james.metrics.api.NoopMetricFactory; import org.apache.james.queue.api.MailQueue; import org.apache.james.queue.api.MailQueueContract; +import org.apache.james.queue.api.MailQueueMetricContract; +import org.apache.james.queue.api.MailQueueMetricExtension; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.extension.ExtendWith; import com.nurkiewicz.asyncretry.AsyncRetryExecutor; @ExtendWith({ReusableDockerRabbitMQExtension.class, DockerCassandraExtension.class}) -public class RabbitMQMailQueueTest implements MailQueueContract { +public class RabbitMQMailQueueTest implements MailQueueContract, MailQueueMetricContract { private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory(); private static CassandraCluster cassandra; @@ -66,7 +71,7 @@ public class RabbitMQMailQueueTest implements MailQueueContract { } @BeforeEach - void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException, URISyntaxException { + void setup(DockerRabbitMQ rabbitMQ, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) throws IOException, TimeoutException, URISyntaxException { CassandraBlobsDAO blobsDAO = new CassandraBlobsDAO(cassandra.getConf(), CassandraConfiguration.DEFAULT_CONFIGURATION, BLOB_ID_FACTORY); Store<MimeMessage, MimeMessagePartsId> mimeMessageStore = MimeMessageStore.factory(blobsDAO).mimeMessageStore(); @@ -81,17 +86,22 @@ public class RabbitMQMailQueueTest implements MailQueueContract { .setPort(rabbitMQ.getAdminPort()) .build(); - RabbitMQConfiguration rabbitMQConfiguration = RabbitMQConfiguration.builder() .amqpUri(amqpUri) .managementUri(rabbitManagementUri) .build(); - RabbitMQConnectionFactory rabbitMQConnectionFactory = new RabbitMQConnectionFactory(rabbitMQConfiguration, - new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor())); + RabbitMQConnectionFactory rabbitMQConnectionFactory = new RabbitMQConnectionFactory( + rabbitMQConfiguration, + new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor())); RabbitClient rabbitClient = new RabbitClient(new RabbitChannelPool(rabbitMQConnectionFactory)); - RabbitMQMailQueue.Factory factory = new RabbitMQMailQueue.Factory(rabbitClient, mimeMessageStore, BLOB_ID_FACTORY); + RabbitMQMailQueue.Factory factory = new RabbitMQMailQueue.Factory( + metricTestSystem.getSpyMetricFactory(), + metricTestSystem.getSpyGaugeRegistry(), + rabbitClient, + mimeMessageStore, + BLOB_ID_FACTORY); RabbitMQManagementApi mqManagementApi = new RabbitMQManagementApi(rabbitManagementUri, new RabbitMQManagementCredentials("guest", "guest".toCharArray())); mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory); } @@ -110,4 +120,10 @@ public class RabbitMQMailQueueTest implements MailQueueContract { public MailQueue getMailQueue() { return mailQueueFactory.createQueue("spool"); } + + @Disabled("RabbitMQ Mail Queue do not yet implement getSize()") + @Override + public void constructorShouldRegisterGetQueueSizeGauge(MailQueueMetricExtension.MailQueueMetricTestSystem testSystem) { + + } } http://git-wip-us.apache.org/repos/asf/james-project/blob/fc034826/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java index 1df4d92..08e4b78 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java @@ -42,6 +42,8 @@ import org.apache.james.blob.cassandra.CassandraBlobModule; import org.apache.james.blob.cassandra.CassandraBlobsDAO; import org.apache.james.blob.mail.MimeMessagePartsId; import org.apache.james.blob.mail.MimeMessageStore; +import org.apache.james.metrics.api.NoopGaugeRegistry; +import org.apache.james.metrics.api.NoopMetricFactory; import org.apache.james.queue.api.MailQueueFactory; import org.apache.james.queue.api.MailQueueFactoryContract; import org.junit.jupiter.api.AfterAll; @@ -87,11 +89,13 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM .managementUri(rabbitManagementUri) .build(); - RabbitMQConnectionFactory rabbitMQConnectionFactory = new RabbitMQConnectionFactory(rabbitMQConfiguration, - new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor())); + RabbitMQConnectionFactory rabbitMQConnectionFactory = new RabbitMQConnectionFactory( + rabbitMQConfiguration, + new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor())); RabbitClient rabbitClient = new RabbitClient(new RabbitChannelPool(rabbitMQConnectionFactory)); - RabbitMQMailQueue.Factory factory = new RabbitMQMailQueue.Factory(rabbitClient, mimeMessageStore, BLOB_ID_FACTORY); + RabbitMQMailQueue.Factory factory = new RabbitMQMailQueue.Factory(new NoopMetricFactory(), new NoopGaugeRegistry(), rabbitClient, mimeMessageStore, BLOB_ID_FACTORY); + RabbitMQManagementApi mqManagementApi = new RabbitMQManagementApi(rabbitManagementUri, new RabbitMQManagementCredentials("guest", "guest".toCharArray())); mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
