This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit f966193420d8bc198a4b3c805e68969e1c724337 Author: Tran Tien Duc <[email protected]> AuthorDate: Thu Jul 18 15:30:09 2019 +0700 JAMES-2836 BlobStore Vault plugging metric records --- mailbox/plugin/deleted-messages-vault/pom.xml | 5 ++ .../vault/blob/BlobStoreDeletedMessageVault.java | 50 ++++++++++++++-- .../blob/BlobStoreDeletedMessageVaultTest.java | 69 +++++++++++++++++++++- .../apache/james/metrics/api/MetricFactory.java | 6 ++ 4 files changed, 123 insertions(+), 7 deletions(-) diff --git a/mailbox/plugin/deleted-messages-vault/pom.xml b/mailbox/plugin/deleted-messages-vault/pom.xml index 23a22a9..5c65510 100644 --- a/mailbox/plugin/deleted-messages-vault/pom.xml +++ b/mailbox/plugin/deleted-messages-vault/pom.xml @@ -94,6 +94,11 @@ <artifactId>james-server-task-json</artifactId> </dependency> <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>metrics-tests</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> diff --git a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java index e585d50..67290b4 100644 --- a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java +++ b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java @@ -32,6 +32,7 @@ import org.apache.james.blob.api.BucketName; import org.apache.james.blob.api.ObjectNotFoundException; import org.apache.james.core.User; import org.apache.james.mailbox.model.MessageId; +import org.apache.james.metrics.api.MetricFactory; import org.apache.james.task.Task; import org.apache.james.vault.DeletedMessage; import org.apache.james.vault.DeletedMessageContentNotFoundException; @@ -56,6 +57,14 @@ import reactor.core.scheduler.Schedulers; public class BlobStoreDeletedMessageVault implements DeletedMessageVault { private static final Logger LOGGER = LoggerFactory.getLogger(BlobStoreDeletedMessageVault.class); + private static final String BLOBSTORE_DELETED_MESSAGE_VAULT_METRIC = "deletedMessageVault:blobStore:"; + static final String APPEND_METRIC_NAME = BLOBSTORE_DELETED_MESSAGE_VAULT_METRIC + "append"; + static final String LOAD_MIME_MESSAGE_METRIC_NAME = BLOBSTORE_DELETED_MESSAGE_VAULT_METRIC + "loadMimeMessage"; + static final String SEARCH_METRIC_NAME = BLOBSTORE_DELETED_MESSAGE_VAULT_METRIC + "search"; + static final String DELETE_METRIC_NAME = BLOBSTORE_DELETED_MESSAGE_VAULT_METRIC + "delete"; + static final String DELETE_EXPIRED_MESSAGES_METRIC_NAME = BLOBSTORE_DELETED_MESSAGE_VAULT_METRIC + "deleteExpiredMessages"; + + private final MetricFactory metricFactory; private final DeletedMessageMetadataVault messageMetadataVault; private final BlobStore blobStore; private final BucketNameGenerator nameGenerator; @@ -63,7 +72,11 @@ public class BlobStoreDeletedMessageVault implements DeletedMessageVault { private final RetentionConfiguration retentionConfiguration; @Inject - BlobStoreDeletedMessageVault(DeletedMessageMetadataVault messageMetadataVault, BlobStore blobStore, BucketNameGenerator nameGenerator, Clock clock, RetentionConfiguration retentionConfiguration) { + BlobStoreDeletedMessageVault(MetricFactory metricFactory, DeletedMessageMetadataVault messageMetadataVault, + BlobStore blobStore, BucketNameGenerator nameGenerator, + Clock clock, + RetentionConfiguration retentionConfiguration) { + this.metricFactory = metricFactory; this.messageMetadataVault = messageMetadataVault; this.blobStore = blobStore; this.nameGenerator = nameGenerator; @@ -76,6 +89,13 @@ public class BlobStoreDeletedMessageVault implements DeletedMessageVault { Preconditions.checkNotNull(deletedMessage); Preconditions.checkNotNull(mimeMessage); BucketName bucketName = nameGenerator.currentBucket(); + + return metricFactory.runPublishingTimerMetric( + APPEND_METRIC_NAME, + appendMessage(deletedMessage, mimeMessage, bucketName)); + } + + private Mono<Void> appendMessage(DeletedMessage deletedMessage, InputStream mimeMessage, BucketName bucketName) { return blobStore.save(bucketName, mimeMessage) .map(blobId -> StorageInformation.builder() .bucketName(bucketName) @@ -89,8 +109,11 @@ public class BlobStoreDeletedMessageVault implements DeletedMessageVault { public Publisher<InputStream> loadMimeMessage(User user, MessageId messageId) { Preconditions.checkNotNull(user); Preconditions.checkNotNull(messageId); - return Mono.from(messageMetadataVault.retrieveStorageInformation(user, messageId)) - .flatMap(storageInformation -> loadMimeMessage(storageInformation, user, messageId)); + + return metricFactory.runPublishingTimerMetric( + LOAD_MIME_MESSAGE_METRIC_NAME, + Mono.from(messageMetadataVault.retrieveStorageInformation(user, messageId)) + .flatMap(storageInformation -> loadMimeMessage(storageInformation, user, messageId))); } private Mono<InputStream> loadMimeMessage(StorageInformation storageInformation, User user, MessageId messageId) { @@ -104,6 +127,13 @@ public class BlobStoreDeletedMessageVault implements DeletedMessageVault { public Publisher<DeletedMessage> search(User user, Query query) { Preconditions.checkNotNull(user); Preconditions.checkNotNull(query); + + return metricFactory.runPublishingTimerMetric( + SEARCH_METRIC_NAME, + searchOn(user, query)); + } + + private Flux<DeletedMessage> searchOn(User user, Query query) { return Flux.from(messageMetadataVault.listRelatedBuckets()) .concatMap(bucketName -> Flux.from(messageMetadataVault.listMessages(bucketName, user))) .map(DeletedMessageWithStorageInformation::getDeletedMessage) @@ -115,6 +145,12 @@ public class BlobStoreDeletedMessageVault implements DeletedMessageVault { Preconditions.checkNotNull(user); Preconditions.checkNotNull(messageId); + return metricFactory.runPublishingTimerMetric( + DELETE_METRIC_NAME, + deleteMessage(user, messageId)); + } + + private Mono<Void> deleteMessage(User user, MessageId messageId) { return Mono.from(messageMetadataVault.retrieveStorageInformation(user, messageId)) .flatMap(storageInformation -> Mono.from(messageMetadataVault.remove(storageInformation.getBucketName(), user, messageId)) .thenReturn(storageInformation)) @@ -127,10 +163,12 @@ public class BlobStoreDeletedMessageVault implements DeletedMessageVault { ZonedDateTime now = ZonedDateTime.now(clock); ZonedDateTime beginningOfRetentionPeriod = now.minus(retentionConfiguration.getRetentionPeriod()); - Flux<BucketName> deleteOperation = retentionQualifiedBuckets(beginningOfRetentionPeriod) - .flatMap(bucketName -> deleteBucketData(bucketName).then(Mono.just(bucketName))); + Flux<BucketName> metricAbleDeleteOperation = metricFactory.runPublishingTimerMetric( + DELETE_EXPIRED_MESSAGES_METRIC_NAME, + retentionQualifiedBuckets(beginningOfRetentionPeriod) + .flatMap(bucketName -> deleteBucketData(bucketName).then(Mono.just(bucketName)))); - return new BlobStoreVaultGarbageCollectionTask(beginningOfRetentionPeriod, deleteOperation); + return new BlobStoreVaultGarbageCollectionTask(beginningOfRetentionPeriod, metricAbleDeleteOperation); } @Override diff --git a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVaultTest.java b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVaultTest.java index 9ba4729..a98bf1f 100644 --- a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVaultTest.java +++ b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVaultTest.java @@ -20,9 +20,18 @@ package org.apache.james.vault.blob; import static org.apache.james.vault.DeletedMessageFixture.CONTENT; +import static org.apache.james.vault.DeletedMessageFixture.DELETED_MESSAGE; import static org.apache.james.vault.DeletedMessageFixture.DELETED_MESSAGE_2; +import static org.apache.james.vault.DeletedMessageFixture.MESSAGE_ID; import static org.apache.james.vault.DeletedMessageFixture.NOW; import static org.apache.james.vault.DeletedMessageFixture.OLD_DELETED_MESSAGE; +import static org.apache.james.vault.DeletedMessageFixture.USER; +import static org.apache.james.vault.blob.BlobStoreDeletedMessageVault.APPEND_METRIC_NAME; +import static org.apache.james.vault.blob.BlobStoreDeletedMessageVault.DELETE_METRIC_NAME; +import static org.apache.james.vault.blob.BlobStoreDeletedMessageVault.LOAD_MIME_MESSAGE_METRIC_NAME; +import static org.apache.james.vault.blob.BlobStoreDeletedMessageVault.DELETE_EXPIRED_MESSAGES_METRIC_NAME; +import static org.apache.james.vault.blob.BlobStoreDeletedMessageVault.SEARCH_METRIC_NAME; +import static org.apache.james.vault.search.Query.ALL; import static org.assertj.core.api.Assertions.assertThat; import java.io.ByteArrayInputStream; @@ -32,6 +41,7 @@ import java.time.ZonedDateTime; import org.apache.james.blob.api.BucketName; import org.apache.james.blob.api.HashBlobId; import org.apache.james.blob.memory.MemoryBlobStore; +import org.apache.james.metrics.tests.RecordingMetricFactory; import org.apache.james.utils.UpdatableTickingClock; import org.apache.james.vault.DeletedMessageVault; import org.apache.james.vault.DeletedMessageVaultContract; @@ -47,11 +57,13 @@ import reactor.core.publisher.Mono; class BlobStoreDeletedMessageVaultTest implements DeletedMessageVaultContract, DeletedMessageVaultSearchContract.AllContracts { private BlobStoreDeletedMessageVault messageVault; private UpdatableTickingClock clock; + private RecordingMetricFactory metricFactory; @BeforeEach void setUp() { clock = new UpdatableTickingClock(NOW.toInstant()); - messageVault = new BlobStoreDeletedMessageVault(new MemoryDeletedMessageMetadataVault(), + metricFactory = new RecordingMetricFactory(); + messageVault = new BlobStoreDeletedMessageVault(metricFactory, new MemoryDeletedMessageMetadataVault(), new MemoryBlobStore(new HashBlobId.Factory()), new BucketNameGenerator(clock), clock, RetentionConfiguration.DEFAULT); } @@ -90,4 +102,59 @@ class BlobStoreDeletedMessageVaultTest implements DeletedMessageVaultContract, D BucketName.of("deleted-messages-2007-12-01"), BucketName.of("deleted-messages-2008-01-01")); } + + @Test + void appendShouldPublishAppendTimerMetrics() { + Mono.from(messageVault.append(DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))) + .block(); + Mono.from(messageVault.append(DELETED_MESSAGE_2, new ByteArrayInputStream(CONTENT))) + .block(); + + assertThat(metricFactory.executionTimesFor(APPEND_METRIC_NAME)) + .hasSize(2); + } + + @Test + void searchShouldPublishSearchTimerMetrics() { + Mono.from(messageVault.search(USER, ALL)) + .block(); + Mono.from(messageVault.search(USER, ALL)) + .block(); + + assertThat(metricFactory.executionTimesFor(SEARCH_METRIC_NAME)) + .hasSize(2); + } + + @Test + void loadMimeMessageShouldPublishLoadMimeMessageTimerMetrics() { + Mono.from(messageVault.loadMimeMessage(USER, MESSAGE_ID)) + .block(); + Mono.from(messageVault.loadMimeMessage(USER, MESSAGE_ID)) + .block(); + + assertThat(metricFactory.executionTimesFor(LOAD_MIME_MESSAGE_METRIC_NAME)) + .hasSize(2); + } + + @Test + void deleteShouldPublishDeleteTimerMetrics() { + Mono.from(messageVault.delete(USER, MESSAGE_ID)) + .block(); + Mono.from(messageVault.delete(USER, MESSAGE_ID)) + .block(); + + assertThat(metricFactory.executionTimesFor(DELETE_METRIC_NAME)) + .hasSize(2); + } + + @Test + void deleteExpiredMessagesTaskShouldPublishRetentionTimerMetrics() throws Exception { + Mono.from(getVault().append(DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block(); + Mono.from(getVault().delete(USER, DELETED_MESSAGE.getMessageId())).block(); + + getVault().deleteExpiredMessagesTask().run(); + + assertThat(metricFactory.executionTimesFor(DELETE_EXPIRED_MESSAGES_METRIC_NAME)) + .hasSize(1); + } } \ No newline at end of file diff --git a/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java b/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java index 18c34f7..670c637 100644 --- a/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java +++ b/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/MetricFactory.java @@ -21,6 +21,7 @@ package org.apache.james.metrics.api; import java.util.function.Supplier; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public interface MetricFactory { @@ -43,6 +44,11 @@ public interface MetricFactory { return mono.doOnSuccess(success -> timer.stopAndPublish()); } + default <T> Flux<T> runPublishingTimerMetric(String name, Flux<T> flux) { + TimeMetric timer = timer(name); + return flux.doOnComplete(timer::stopAndPublish); + } + default void runPublishingTimerMetric(String name, Runnable runnable) { runPublishingTimerMetric(name, () -> { runnable.run(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
