JAMES-2540 RabbitMQ Mail Queue should comply with the metric contract

Note that as getSize() is not yet implemented the corresponding gauge is not
yet registered...


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/fc034826
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/fc034826
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/fc034826

Branch: refs/heads/master
Commit: fc034826eeae8b60d5674bbea26365f449fc70d6
Parents: 6e8d222
Author: Benoit Tellier <[email protected]>
Authored: Tue Sep 11 09:51:00 2018 +0700
Committer: Benoit Tellier <[email protected]>
Committed: Fri Sep 14 11:05:42 2018 +0700

----------------------------------------------------------------------
 .../apache/james/metrics/api/MetricFactory.java | 16 ++++++-
 .../james/metrics/api/NoopMetricFactory.java    |  7 ---
 .../dropwizard/DropWizardMetricFactory.java     | 12 -----
 .../metrics/logger/DefaultMetricFactory.java    | 12 -----
 server/queue/queue-rabbitmq/pom.xml             |  4 ++
 .../james/queue/rabbitmq/RabbitMQMailQueue.java | 49 +++++++++++++++-----
 .../queue/rabbitmq/RabbitMQMailQueueTest.java   | 28 ++++++++---
 .../rabbitmq/RabbitMqMailQueueFactoryTest.java  | 10 ++--
 8 files changed, 86 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/fc034826/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java
----------------------------------------------------------------------
diff --git 
a/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java
 
b/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java
index 10937b0..ca63d43 100644
--- 
a/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java
+++ 
b/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java
@@ -27,5 +27,19 @@ public interface MetricFactory {
 
     TimeMetric timer(String name);
 
-    <T> T withMetric(String name, Supplier<T> operation);
+    default <T> T withMetric(String name, Supplier<T> operation) {
+        TimeMetric timer = timer(name);
+        try {
+            return operation.get();
+        } finally {
+            timer.stopAndPublish();
+        }
+    }
+
+    default void withMetric(String name, Runnable runnable) {
+        withMetric(name, () -> {
+            runnable.run();
+            return null;
+        });
+    }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/fc034826/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/NoopMetricFactory.java
----------------------------------------------------------------------
diff --git 
a/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/NoopMetricFactory.java
 
b/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/NoopMetricFactory.java
index f5d82ca..446af5a 100644
--- 
a/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/NoopMetricFactory.java
+++ 
b/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/NoopMetricFactory.java
@@ -18,8 +18,6 @@
  ****************************************************************/
 package org.apache.james.metrics.api;
 
-import java.util.function.Supplier;
-
 public class NoopMetricFactory implements MetricFactory {
 
     @Override
@@ -65,9 +63,4 @@ public class NoopMetricFactory implements MetricFactory {
         }
     }
 
-    @Override
-    public <T> T withMetric(String name, Supplier<T> operation) {
-        return operation.get();
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/fc034826/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactory.java
----------------------------------------------------------------------
diff --git 
a/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactory.java
 
b/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactory.java
index f2d5b11..500259b 100644
--- 
a/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactory.java
+++ 
b/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactory.java
@@ -19,8 +19,6 @@
 
 package org.apache.james.metrics.dropwizard;
 
-import java.util.function.Supplier;
-
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import javax.inject.Inject;
@@ -54,16 +52,6 @@ public class DropWizardMetricFactory implements 
MetricFactory {
         return new DropWizardTimeMetric(name, 
metricRegistry.timer(name).time());
     }
 
-    @Override
-    public <T> T withMetric(String name, Supplier<T> operation) {
-        TimeMetric timer = timer(name);
-        try {
-            return operation.get();
-        } finally {
-            timer.stopAndPublish();
-        }
-    }
-
     @PostConstruct
     public void start() {
         jmxReporter.start();

http://git-wip-us.apache.org/repos/asf/james-project/blob/fc034826/metrics/metrics-logger/src/main/java/org/apache/james/metrics/logger/DefaultMetricFactory.java
----------------------------------------------------------------------
diff --git 
a/metrics/metrics-logger/src/main/java/org/apache/james/metrics/logger/DefaultMetricFactory.java
 
b/metrics/metrics-logger/src/main/java/org/apache/james/metrics/logger/DefaultMetricFactory.java
index 2997805..b00e3f3 100644
--- 
a/metrics/metrics-logger/src/main/java/org/apache/james/metrics/logger/DefaultMetricFactory.java
+++ 
b/metrics/metrics-logger/src/main/java/org/apache/james/metrics/logger/DefaultMetricFactory.java
@@ -18,8 +18,6 @@
  ****************************************************************/
 package org.apache.james.metrics.logger;
 
-import java.util.function.Supplier;
-
 import org.apache.james.metrics.api.Metric;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.metrics.api.TimeMetric;
@@ -40,14 +38,4 @@ public class DefaultMetricFactory implements MetricFactory {
         return new DefaultTimeMetric(name);
     }
 
-    @Override
-    public <T> T withMetric(String name, Supplier<T> operation) {
-        TimeMetric timer = timer(name);
-        try {
-            return operation.get();
-        } finally {
-            timer.stopAndPublish();
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/fc034826/server/queue/queue-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/pom.xml 
b/server/queue/queue-rabbitmq/pom.xml
index 3a70738..2bc2a58 100644
--- a/server/queue/queue-rabbitmq/pom.xml
+++ b/server/queue/queue-rabbitmq/pom.xml
@@ -95,6 +95,10 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>metrics-api</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/james-project/blob/fc034826/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
index f9ad07c..8f9b68e 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
@@ -37,6 +37,9 @@ import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.Store;
 import org.apache.james.blob.mail.MimeMessagePartsId;
 import org.apache.james.core.MailAddress;
+import org.apache.james.metrics.api.GaugeRegistry;
+import org.apache.james.metrics.api.Metric;
+import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.server.core.MailImpl;
 import org.apache.james.util.SerializationUtil;
@@ -90,19 +93,24 @@ public class RabbitMQMailQueue implements MailQueue {
     }
 
     static class Factory {
+        private final MetricFactory metricFactory;
+        private final GaugeRegistry gaugeRegistry;
         private final RabbitClient rabbitClient;
         private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
         private final BlobId.Factory blobIdFactory;
 
         @Inject
-        @VisibleForTesting Factory(RabbitClient rabbitClient, 
Store<MimeMessage, MimeMessagePartsId> mimeMessageStore, BlobId.Factory 
blobIdFactory) {
+        @VisibleForTesting Factory(MetricFactory metricFactory, GaugeRegistry 
gaugeRegistry, RabbitClient rabbitClient,
+                                   Store<MimeMessage, MimeMessagePartsId> 
mimeMessageStore, BlobId.Factory blobIdFactory) {
+            this.metricFactory = metricFactory;
+            this.gaugeRegistry = gaugeRegistry;
             this.rabbitClient = rabbitClient;
             this.mimeMessageStore = mimeMessageStore;
             this.blobIdFactory = blobIdFactory;
         }
 
         RabbitMQMailQueue create(MailQueueName mailQueueName) {
-            return new RabbitMQMailQueue(mailQueueName, rabbitClient, 
mimeMessageStore, blobIdFactory);
+            return new RabbitMQMailQueue(metricFactory, gaugeRegistry, 
mailQueueName, rabbitClient, mimeMessageStore, blobIdFactory);
         }
     }
 
@@ -113,8 +121,13 @@ public class RabbitMQMailQueue implements MailQueue {
     private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
     private final BlobId.Factory blobIdFactory;
     private final ObjectMapper objectMapper;
+    private final MetricFactory metricFactory;
+    private final Metric enqueueMetric;
+    private final Metric dequeueMetric;
+    private final GaugeRegistry gaugeRegistry;
 
-    RabbitMQMailQueue(MailQueueName name, RabbitClient rabbitClient, 
Store<MimeMessage, MimeMessagePartsId> mimeMessageStore, BlobId.Factory 
blobIdFactory) {
+    RabbitMQMailQueue(MetricFactory metricFactory, GaugeRegistry 
gaugeRegistry, MailQueueName name, RabbitClient rabbitClient,
+                      Store<MimeMessage, MimeMessagePartsId> mimeMessageStore, 
BlobId.Factory blobIdFactory) {
         this.mimeMessageStore = mimeMessageStore;
         this.blobIdFactory = blobIdFactory;
         this.name = name;
@@ -123,6 +136,11 @@ public class RabbitMQMailQueue implements MailQueue {
             .registerModule(new Jdk8Module())
             .registerModule(new JavaTimeModule())
             .registerModule(new GuavaModule());
+
+        this.metricFactory = metricFactory;
+        this.gaugeRegistry = gaugeRegistry;
+        this.enqueueMetric = 
metricFactory.generate(ENQUEUED_METRIC_NAME_PREFIX + name.asString());
+        this.dequeueMetric = 
metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + name.asString());
     }
 
     @Override
@@ -140,10 +158,15 @@ public class RabbitMQMailQueue implements MailQueue {
 
     @Override
     public void enQueue(Mail mail) throws MailQueueException {
-        MimeMessagePartsId partsId = saveBlobs(mail).join();
-        MailDTO mailDTO = MailDTO.fromMail(mail, partsId);
-        byte[] message = getMessageBytes(mailDTO);
-        rabbitClient.publish(name, message);
+        metricFactory.withMetric(ENQUEUED_TIMER_METRIC_NAME_PREFIX + 
name.asString(),
+            Throwing.runnable(() -> {
+                MimeMessagePartsId partsId = saveBlobs(mail).join();
+                MailDTO mailDTO = MailDTO.fromMail(mail, partsId);
+                byte[] message = getMessageBytes(mailDTO);
+                rabbitClient.publish(name, message);
+
+                enqueueMetric.increment();
+            }).sneakyThrow());
     }
 
     private CompletableFuture<MimeMessagePartsId> saveBlobs(Mail mail) throws 
MailQueueException {
@@ -164,10 +187,14 @@ public class RabbitMQMailQueue implements MailQueue {
 
     @Override
     public MailQueueItem deQueue() throws MailQueueException {
-        GetResponse getResponse = pollChannel();
-        MailDTO mailDTO = toDTO(getResponse);
-        Mail mail = toMail(mailDTO);
-        return new RabbitMQMailQueueItem(rabbitClient, 
getResponse.getEnvelope().getDeliveryTag(), mail);
+        return metricFactory.withMetric(DEQUEUED_TIMER_METRIC_NAME_PREFIX + 
name.asString(),
+            Throwing.supplier(() -> {
+                GetResponse getResponse = pollChannel();
+                MailDTO mailDTO = toDTO(getResponse);
+                Mail mail = toMail(mailDTO);
+                dequeueMetric.increment();
+                return new RabbitMQMailQueueItem(rabbitClient, 
getResponse.getEnvelope().getDeliveryTag(), mail);
+            }).sneakyThrow());
     }
 
     private MailDTO toDTO(GetResponse getResponse) throws MailQueueException {

http://git-wip-us.apache.org/repos/asf/james-project/blob/fc034826/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index 811c361..04e1b72 100644
--- 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -42,18 +42,23 @@ import org.apache.james.blob.cassandra.CassandraBlobModule;
 import org.apache.james.blob.cassandra.CassandraBlobsDAO;
 import org.apache.james.blob.mail.MimeMessagePartsId;
 import org.apache.james.blob.mail.MimeMessageStore;
+import org.apache.james.metrics.api.NoopGaugeRegistry;
+import org.apache.james.metrics.api.NoopMetricFactory;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.api.MailQueueContract;
+import org.apache.james.queue.api.MailQueueMetricContract;
+import org.apache.james.queue.api.MailQueueMetricExtension;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
 
 @ExtendWith({ReusableDockerRabbitMQExtension.class, 
DockerCassandraExtension.class})
-public class RabbitMQMailQueueTest implements MailQueueContract {
+public class RabbitMQMailQueueTest implements MailQueueContract, 
MailQueueMetricContract {
     private static final HashBlobId.Factory BLOB_ID_FACTORY = new 
HashBlobId.Factory();
 
     private static CassandraCluster cassandra;
@@ -66,7 +71,7 @@ public class RabbitMQMailQueueTest implements 
MailQueueContract {
     }
 
     @BeforeEach
-    void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException, 
URISyntaxException {
+    void setup(DockerRabbitMQ rabbitMQ, 
MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) throws 
IOException, TimeoutException, URISyntaxException {
         CassandraBlobsDAO blobsDAO = new 
CassandraBlobsDAO(cassandra.getConf(), 
CassandraConfiguration.DEFAULT_CONFIGURATION, BLOB_ID_FACTORY);
         Store<MimeMessage, MimeMessagePartsId> mimeMessageStore = 
MimeMessageStore.factory(blobsDAO).mimeMessageStore();
 
@@ -81,17 +86,22 @@ public class RabbitMQMailQueueTest implements 
MailQueueContract {
             .setPort(rabbitMQ.getAdminPort())
             .build();
 
-
         RabbitMQConfiguration rabbitMQConfiguration = 
RabbitMQConfiguration.builder()
             .amqpUri(amqpUri)
             .managementUri(rabbitManagementUri)
             .build();
 
-        RabbitMQConnectionFactory rabbitMQConnectionFactory = new 
RabbitMQConnectionFactory(rabbitMQConfiguration,
-                new 
AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor()));
+        RabbitMQConnectionFactory rabbitMQConnectionFactory = new 
RabbitMQConnectionFactory(
+            rabbitMQConfiguration,
+            new 
AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor()));
 
         RabbitClient rabbitClient = new RabbitClient(new 
RabbitChannelPool(rabbitMQConnectionFactory));
-        RabbitMQMailQueue.Factory factory = new 
RabbitMQMailQueue.Factory(rabbitClient, mimeMessageStore, BLOB_ID_FACTORY);
+        RabbitMQMailQueue.Factory factory = new RabbitMQMailQueue.Factory(
+            metricTestSystem.getSpyMetricFactory(),
+            metricTestSystem.getSpyGaugeRegistry(),
+            rabbitClient,
+            mimeMessageStore,
+            BLOB_ID_FACTORY);
         RabbitMQManagementApi mqManagementApi = new 
RabbitMQManagementApi(rabbitManagementUri, new 
RabbitMQManagementCredentials("guest", "guest".toCharArray()));
         mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, 
mqManagementApi, factory);
     }
@@ -110,4 +120,10 @@ public class RabbitMQMailQueueTest implements 
MailQueueContract {
     public MailQueue getMailQueue() {
         return mailQueueFactory.createQueue("spool");
     }
+
+    @Disabled("RabbitMQ Mail Queue do not yet implement getSize()")
+    @Override
+    public void 
constructorShouldRegisterGetQueueSizeGauge(MailQueueMetricExtension.MailQueueMetricTestSystem
 testSystem) {
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/fc034826/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
index 1df4d92..08e4b78 100644
--- 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
+++ 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
@@ -42,6 +42,8 @@ import org.apache.james.blob.cassandra.CassandraBlobModule;
 import org.apache.james.blob.cassandra.CassandraBlobsDAO;
 import org.apache.james.blob.mail.MimeMessagePartsId;
 import org.apache.james.blob.mail.MimeMessageStore;
+import org.apache.james.metrics.api.NoopGaugeRegistry;
+import org.apache.james.metrics.api.NoopMetricFactory;
 import org.apache.james.queue.api.MailQueueFactory;
 import org.apache.james.queue.api.MailQueueFactoryContract;
 import org.junit.jupiter.api.AfterAll;
@@ -87,11 +89,13 @@ class RabbitMqMailQueueFactoryTest implements 
MailQueueFactoryContract<RabbitMQM
             .managementUri(rabbitManagementUri)
             .build();
 
-        RabbitMQConnectionFactory rabbitMQConnectionFactory = new 
RabbitMQConnectionFactory(rabbitMQConfiguration,
-                new 
AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor()));
+        RabbitMQConnectionFactory rabbitMQConnectionFactory = new 
RabbitMQConnectionFactory(
+            rabbitMQConfiguration,
+            new 
AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor()));
 
         RabbitClient rabbitClient = new RabbitClient(new 
RabbitChannelPool(rabbitMQConnectionFactory));
-        RabbitMQMailQueue.Factory factory = new 
RabbitMQMailQueue.Factory(rabbitClient, mimeMessageStore, BLOB_ID_FACTORY);
+        RabbitMQMailQueue.Factory factory = new RabbitMQMailQueue.Factory(new 
NoopMetricFactory(), new NoopGaugeRegistry(), rabbitClient, mimeMessageStore, 
BLOB_ID_FACTORY);
+
         RabbitMQManagementApi mqManagementApi = new 
RabbitMQManagementApi(rabbitManagementUri, new 
RabbitMQManagementCredentials("guest", "guest".toCharArray()));
         mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, 
mqManagementApi, factory);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to