This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit e03d38f6032b2e06a70ac4dbf9b487b36ff99de6 Author: Rene Cordier <[email protected]> AuthorDate: Thu Apr 16 17:36:45 2020 +0700 JAMES-3138 Reactify CurrentQuotaManager --- .../james/mailbox/quota/CurrentQuotaManager.java | 6 +- .../quota/CassandraCurrentQuotaManager.java | 36 ++++++------ .../mailbox/jpa/quota/JpaCurrentQuotaManager.java | 66 ++++++++++++---------- .../quota/InMemoryCurrentQuotaManager.java | 43 +++++++------- .../quota/InMemoryCurrentQuotaManagerTest.java | 10 ++-- .../store/quota/ListeningCurrentQuotaUpdater.java | 53 ++++++++--------- .../store/quota/StoreCurrentQuotaManager.java | 7 ++- .../mailbox/store/quota/StoreQuotaManager.java | 6 +- .../quota/ListeningCurrentQuotaUpdaterTest.java | 7 ++- .../store/quota/StoreCurrentQuotaManagerTest.java | 24 ++++---- .../mailbox/store/quota/StoreQuotaManagerTest.java | 16 ++++-- .../james/webadmin/routes/UserQuotaRoutesTest.java | 4 +- 12 files changed, 149 insertions(+), 129 deletions(-) diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/quota/CurrentQuotaManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/quota/CurrentQuotaManager.java index 06cc970..f1f3d41 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/quota/CurrentQuotaManager.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/quota/CurrentQuotaManager.java @@ -21,16 +21,16 @@ package org.apache.james.mailbox.quota; import org.apache.james.core.quota.QuotaCountUsage; import org.apache.james.core.quota.QuotaSizeUsage; -import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.model.QuotaRoot; +import org.reactivestreams.Publisher; /** * This interface allows us to get the current value associated to a quota value */ public interface CurrentQuotaManager { - QuotaCountUsage getCurrentMessageCount(QuotaRoot quotaRoot) throws MailboxException; + Publisher<QuotaCountUsage> getCurrentMessageCount(QuotaRoot quotaRoot); - QuotaSizeUsage getCurrentStorage(QuotaRoot quotaRoot) throws MailboxException; + Publisher<QuotaSizeUsage> getCurrentStorage(QuotaRoot quotaRoot); } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraCurrentQuotaManager.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraCurrentQuotaManager.java index 0254999..a4ba974 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraCurrentQuotaManager.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraCurrentQuotaManager.java @@ -28,6 +28,7 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.update; import javax.inject.Inject; +import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; import org.apache.james.core.quota.QuotaCountUsage; import org.apache.james.core.quota.QuotaSizeUsage; import org.apache.james.mailbox.cassandra.table.CassandraCurrentQuota; @@ -36,12 +37,13 @@ import org.apache.james.mailbox.model.QuotaRoot; import org.apache.james.mailbox.store.quota.StoreCurrentQuotaManager; import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Session; +import reactor.core.publisher.Mono; + public class CassandraCurrentQuotaManager implements StoreCurrentQuotaManager { - private final Session session; + private final CassandraAsyncExecutor cassandraAsyncExecutor; private final PreparedStatement increaseStatement; private final PreparedStatement decreaseStatement; private final PreparedStatement getCurrentMessageCountStatement; @@ -49,7 +51,7 @@ public class CassandraCurrentQuotaManager implements StoreCurrentQuotaManager { @Inject public CassandraCurrentQuotaManager(Session session) { - this.session = session; + this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session); this.increaseStatement = session.prepare(update(CassandraCurrentQuota.TABLE_NAME) .with(incr(CassandraCurrentQuota.MESSAGE_COUNT, bindMarker())) .and(incr(CassandraCurrentQuota.STORAGE, bindMarker())) @@ -67,34 +69,30 @@ public class CassandraCurrentQuotaManager implements StoreCurrentQuotaManager { } @Override - public void increase(QuotaOperation quotaOperation) { - session.execute(increaseStatement.bind(quotaOperation.count().asLong(), + public Mono<Void> increase(QuotaOperation quotaOperation) { + return cassandraAsyncExecutor.executeVoid(increaseStatement.bind(quotaOperation.count().asLong(), quotaOperation.size().asLong(), quotaOperation.quotaRoot().getValue())); } @Override - public void decrease(QuotaOperation quotaOperation) { - session.execute(decreaseStatement.bind(quotaOperation.count().asLong(), + public Mono<Void> decrease(QuotaOperation quotaOperation) { + return cassandraAsyncExecutor.executeVoid(decreaseStatement.bind(quotaOperation.count().asLong(), quotaOperation.size().asLong(), quotaOperation.quotaRoot().getValue())); } @Override - public QuotaCountUsage getCurrentMessageCount(QuotaRoot quotaRoot) { - ResultSet resultSet = session.execute(getCurrentMessageCountStatement.bind(quotaRoot.getValue())); - if (resultSet.isExhausted()) { - return QuotaCountUsage.count(0L); - } - return QuotaCountUsage.count(resultSet.one().getLong(CassandraCurrentQuota.MESSAGE_COUNT)); + public Mono<QuotaCountUsage> getCurrentMessageCount(QuotaRoot quotaRoot) { + return cassandraAsyncExecutor.executeSingleRow(getCurrentMessageCountStatement.bind(quotaRoot.getValue())) + .map(row -> QuotaCountUsage.count(row.getLong(CassandraCurrentQuota.MESSAGE_COUNT))) + .defaultIfEmpty(QuotaCountUsage.count(0L)); } @Override - public QuotaSizeUsage getCurrentStorage(QuotaRoot quotaRoot) { - ResultSet resultSet = session.execute(getCurrentStorageStatement.bind(quotaRoot.getValue())); - if (resultSet.isExhausted()) { - return QuotaSizeUsage.size(0L); - } - return QuotaSizeUsage.size(resultSet.one().getLong(CassandraCurrentQuota.STORAGE)); + public Mono<QuotaSizeUsage> getCurrentStorage(QuotaRoot quotaRoot) { + return cassandraAsyncExecutor.executeSingleRow(getCurrentStorageStatement.bind(quotaRoot.getValue())) + .map(row -> QuotaSizeUsage.size(row.getLong(CassandraCurrentQuota.STORAGE))) + .defaultIfEmpty(QuotaSizeUsage.size(0L)); } } diff --git a/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/quota/JpaCurrentQuotaManager.java b/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/quota/JpaCurrentQuotaManager.java index 48ef98a..f56bab0 100644 --- a/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/quota/JpaCurrentQuotaManager.java +++ b/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/quota/JpaCurrentQuotaManager.java @@ -33,6 +33,8 @@ import org.apache.james.mailbox.model.QuotaOperation; import org.apache.james.mailbox.model.QuotaRoot; import org.apache.james.mailbox.store.quota.StoreCurrentQuotaManager; +import reactor.core.publisher.Mono; + public class JpaCurrentQuotaManager implements StoreCurrentQuotaManager { public static final long NO_MESSAGES = 0L; @@ -48,49 +50,53 @@ public class JpaCurrentQuotaManager implements StoreCurrentQuotaManager { } @Override - public QuotaCountUsage getCurrentMessageCount(QuotaRoot quotaRoot) { + public Mono<QuotaCountUsage> getCurrentMessageCount(QuotaRoot quotaRoot) { EntityManager entityManager = entityManagerFactory.createEntityManager(); - return Optional.ofNullable(retrieveUserQuota(entityManager, quotaRoot)) + + return Mono.fromCallable(() -> Optional.ofNullable(retrieveUserQuota(entityManager, quotaRoot)) .map(JpaCurrentQuota::getMessageCount) - .orElse(QuotaCountUsage.count(NO_STORED_BYTES)); + .orElse(QuotaCountUsage.count(NO_STORED_BYTES))); } @Override - public QuotaSizeUsage getCurrentStorage(QuotaRoot quotaRoot) { + public Mono<QuotaSizeUsage> getCurrentStorage(QuotaRoot quotaRoot) { EntityManager entityManager = entityManagerFactory.createEntityManager(); - return Optional.ofNullable(retrieveUserQuota(entityManager, quotaRoot)) + + return Mono.fromCallable(() -> Optional.ofNullable(retrieveUserQuota(entityManager, quotaRoot)) .map(JpaCurrentQuota::getSize) - .orElse(QuotaSizeUsage.size(NO_STORED_BYTES)); + .orElse(QuotaSizeUsage.size(NO_STORED_BYTES))); } @Override - public void increase(QuotaOperation quotaOperation) { - transactionRunner.run( - entityManager -> { - QuotaRoot quotaRoot = quotaOperation.quotaRoot(); - - JpaCurrentQuota jpaCurrentQuota = Optional.ofNullable(retrieveUserQuota(entityManager, quotaRoot)) - .orElse(new JpaCurrentQuota(quotaRoot.getValue(), NO_MESSAGES, NO_STORED_BYTES)); - - entityManager.merge(new JpaCurrentQuota(quotaRoot.getValue(), - jpaCurrentQuota.getMessageCount().asLong() + quotaOperation.count().asLong(), - jpaCurrentQuota.getSize().asLong() + quotaOperation.size().asLong())); - }); + public Mono<Void> increase(QuotaOperation quotaOperation) { + return Mono.fromRunnable(() -> + transactionRunner.run( + entityManager -> { + QuotaRoot quotaRoot = quotaOperation.quotaRoot(); + + JpaCurrentQuota jpaCurrentQuota = Optional.ofNullable(retrieveUserQuota(entityManager, quotaRoot)) + .orElse(new JpaCurrentQuota(quotaRoot.getValue(), NO_MESSAGES, NO_STORED_BYTES)); + + entityManager.merge(new JpaCurrentQuota(quotaRoot.getValue(), + jpaCurrentQuota.getMessageCount().asLong() + quotaOperation.count().asLong(), + jpaCurrentQuota.getSize().asLong() + quotaOperation.size().asLong())); + })); } @Override - public void decrease(QuotaOperation quotaOperation) { - transactionRunner.run( - entityManager -> { - QuotaRoot quotaRoot = quotaOperation.quotaRoot(); - - JpaCurrentQuota jpaCurrentQuota = Optional.ofNullable(retrieveUserQuota(entityManager, quotaRoot)) - .orElse(new JpaCurrentQuota(quotaRoot.getValue(), NO_MESSAGES, NO_STORED_BYTES)); - - entityManager.merge(new JpaCurrentQuota(quotaRoot.getValue(), - jpaCurrentQuota.getMessageCount().asLong() - quotaOperation.count().asLong(), - jpaCurrentQuota.getSize().asLong() - quotaOperation.size().asLong())); - }); + public Mono<Void> decrease(QuotaOperation quotaOperation) { + return Mono.fromRunnable(() -> + transactionRunner.run( + entityManager -> { + QuotaRoot quotaRoot = quotaOperation.quotaRoot(); + + JpaCurrentQuota jpaCurrentQuota = Optional.ofNullable(retrieveUserQuota(entityManager, quotaRoot)) + .orElse(new JpaCurrentQuota(quotaRoot.getValue(), NO_MESSAGES, NO_STORED_BYTES)); + + entityManager.merge(new JpaCurrentQuota(quotaRoot.getValue(), + jpaCurrentQuota.getMessageCount().asLong() - quotaOperation.count().asLong(), + jpaCurrentQuota.getSize().asLong() - quotaOperation.size().asLong())); + })); } private JpaCurrentQuota retrieveUserQuota(EntityManager entityManager, QuotaRoot quotaRoot) { diff --git a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaManager.java b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaManager.java index 6a917e6..fcbe5ff 100644 --- a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaManager.java +++ b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaManager.java @@ -19,7 +19,6 @@ package org.apache.james.mailbox.inmemory.quota; -import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.UnaryOperator; @@ -40,6 +39,8 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import reactor.core.publisher.Mono; + public class InMemoryCurrentQuotaManager implements StoreCurrentQuotaManager { private final LoadingCache<QuotaRoot, AtomicReference<CurrentQuotas>> quotaCache; @@ -55,38 +56,34 @@ public class InMemoryCurrentQuotaManager implements StoreCurrentQuotaManager { } @Override - public void increase(QuotaOperation quotaOperation) throws MailboxException { - updateQuota(quotaOperation.quotaRoot(), quota -> quota.increase(new CurrentQuotas(quotaOperation.count(), quotaOperation.size()))); + public Mono<Void> increase(QuotaOperation quotaOperation) { + return updateQuota(quotaOperation.quotaRoot(), quota -> quota.increase(new CurrentQuotas(quotaOperation.count(), quotaOperation.size()))); } @Override - public void decrease(QuotaOperation quotaOperation) throws MailboxException { - updateQuota(quotaOperation.quotaRoot(), quota -> quota.decrease(new CurrentQuotas(quotaOperation.count(), quotaOperation.size()))); + public Mono<Void> decrease(QuotaOperation quotaOperation) { + return updateQuota(quotaOperation.quotaRoot(), quota -> quota.decrease(new CurrentQuotas(quotaOperation.count(), quotaOperation.size()))); } @Override - public QuotaCountUsage getCurrentMessageCount(QuotaRoot quotaRoot) throws MailboxException { - try { - return quotaCache.get(quotaRoot).get().count(); - } catch (ExecutionException e) { - throw new MailboxException("Exception caught", e); - } + public Mono<QuotaCountUsage> getCurrentMessageCount(QuotaRoot quotaRoot) { + return Mono.fromCallable(() -> quotaCache.get(quotaRoot).get().count()) + .onErrorMap(this::wrapAsMailboxException); } @Override - public QuotaSizeUsage getCurrentStorage(QuotaRoot quotaRoot) throws MailboxException { - try { - return quotaCache.get(quotaRoot).get().size(); - } catch (ExecutionException e) { - throw new MailboxException("Exception caught", e); - } + public Mono<QuotaSizeUsage> getCurrentStorage(QuotaRoot quotaRoot) { + return Mono.fromCallable(() -> quotaCache.get(quotaRoot).get().size()) + .onErrorMap(this::wrapAsMailboxException); + } + + private Mono<Void> updateQuota(QuotaRoot quotaRoot, UnaryOperator<CurrentQuotas> quotaFunction) { + return Mono.fromCallable(() -> quotaCache.get(quotaRoot).updateAndGet(quotaFunction)) + .onErrorMap(this::wrapAsMailboxException) + .then(); } - private void updateQuota(QuotaRoot quotaRoot, UnaryOperator<CurrentQuotas> quotaFunction) throws MailboxException { - try { - quotaCache.get(quotaRoot).updateAndGet(quotaFunction); - } catch (ExecutionException e) { - throw new MailboxException("Exception caught", e); - } + private Throwable wrapAsMailboxException(Throwable throwable) { + return new MailboxException("Exception caught", throwable); } } diff --git a/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaManagerTest.java b/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaManagerTest.java index 2014706..26e2b1f 100644 --- a/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaManagerTest.java +++ b/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaManagerTest.java @@ -56,7 +56,7 @@ class InMemoryCurrentQuotaManagerTest { when(mockedCurrentQuotaCalculator.recalculateCurrentQuotas(QUOTA_ROOT, null)) .thenReturn(CURRENT_QUOTAS); - assertThat(testee.getCurrentMessageCount(QUOTA_ROOT)).isEqualTo(QuotaCountUsage.count(18)); + assertThat(testee.getCurrentMessageCount(QUOTA_ROOT).block()).isEqualTo(QuotaCountUsage.count(18)); } @Test @@ -64,7 +64,7 @@ class InMemoryCurrentQuotaManagerTest { when(mockedCurrentQuotaCalculator.recalculateCurrentQuotas(QUOTA_ROOT, null)) .thenReturn(CURRENT_QUOTAS); - assertThat(testee.getCurrentStorage(QUOTA_ROOT)).isEqualTo(QuotaSizeUsage.size(512)); + assertThat(testee.getCurrentStorage(QUOTA_ROOT).block()).isEqualTo(QuotaSizeUsage.size(512)); } @Test @@ -73,9 +73,9 @@ class InMemoryCurrentQuotaManagerTest { .thenReturn(CURRENT_QUOTAS); QuotaOperation quotaOperation = new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(10), QuotaSizeUsage.size(100)); - testee.increase(quotaOperation); + testee.increase(quotaOperation).block(); - assertThat(testee.getCurrentMessageCount(QUOTA_ROOT)).isEqualTo(QuotaCountUsage.count(28)); - assertThat(testee.getCurrentStorage(QUOTA_ROOT)).isEqualTo(QuotaSizeUsage.size(612)); + assertThat(testee.getCurrentMessageCount(QUOTA_ROOT).block()).isEqualTo(QuotaCountUsage.count(28)); + assertThat(testee.getCurrentStorage(QUOTA_ROOT).block()).isEqualTo(QuotaSizeUsage.size(612)); } } diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java index bf96d76..c8bc83a 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java @@ -40,6 +40,8 @@ import org.apache.james.mailbox.store.event.EventFactory; import com.github.fge.lambdas.Throwing; import com.google.common.collect.ImmutableSet; +import reactor.core.publisher.Mono; + public class ListeningCurrentQuotaUpdater implements MailboxListener.GroupMailboxListener, QuotaUpdater { public static class ListeningCurrentQuotaUpdaterGroup extends Group { @@ -89,36 +91,34 @@ public class ListeningCurrentQuotaUpdater implements MailboxListener.GroupMailbo private void handleExpungedEvent(Expunged expunged, QuotaRoot quotaRoot) { computeQuotaOperation(expunged, quotaRoot).ifPresent(Throwing.<QuotaOperation>consumer(quotaOperation -> { - currentQuotaManager.decrease(quotaOperation); - - eventBus.dispatch( - EventFactory.quotaUpdated() - .randomEventId() - .user(expunged.getUsername()) - .quotaRoot(quotaRoot) - .quotaCount(quotaManager.getMessageQuota(quotaRoot)) - .quotaSize(quotaManager.getStorageQuota(quotaRoot)) - .instant(Instant.now()) - .build(), - NO_REGISTRATION_KEYS) + currentQuotaManager.decrease(quotaOperation) + .then(Mono.defer(Throwing.supplier(() -> eventBus.dispatch( + EventFactory.quotaUpdated() + .randomEventId() + .user(expunged.getUsername()) + .quotaRoot(quotaRoot) + .quotaCount(quotaManager.getMessageQuota(quotaRoot)) + .quotaSize(quotaManager.getStorageQuota(quotaRoot)) + .instant(Instant.now()) + .build(), + NO_REGISTRATION_KEYS)).sneakyThrow())) .block(); }).sneakyThrow()); } private void handleAddedEvent(Added added, QuotaRoot quotaRoot) { computeQuotaOperation(added, quotaRoot).ifPresent(Throwing.<QuotaOperation>consumer(quotaOperation -> { - currentQuotaManager.increase(quotaOperation); - - eventBus.dispatch( - EventFactory.quotaUpdated() - .randomEventId() - .user(added.getUsername()) - .quotaRoot(quotaRoot) - .quotaCount(quotaManager.getMessageQuota(quotaRoot)) - .quotaSize(quotaManager.getStorageQuota(quotaRoot)) - .instant(Instant.now()) - .build(), - NO_REGISTRATION_KEYS) + currentQuotaManager.increase(quotaOperation) + .then(Mono.defer(Throwing.supplier(() -> eventBus.dispatch( + EventFactory.quotaUpdated() + .randomEventId() + .user(added.getUsername()) + .quotaRoot(quotaRoot) + .quotaCount(quotaManager.getMessageQuota(quotaRoot)) + .quotaSize(quotaManager.getStorageQuota(quotaRoot)) + .instant(Instant.now()) + .build(), + NO_REGISTRATION_KEYS)).sneakyThrow())) .block(); }).sneakyThrow()); } @@ -144,8 +144,9 @@ public class ListeningCurrentQuotaUpdater implements MailboxListener.GroupMailbo boolean mailboxContainedMessages = mailboxDeletionEvent.getDeletedMessageCount().asLong() > 0; if (mailboxContainedMessages) { currentQuotaManager.decrease(new QuotaOperation(mailboxDeletionEvent.getQuotaRoot(), - mailboxDeletionEvent.getDeletedMessageCount(), - mailboxDeletionEvent.getTotalDeletedSize())); + mailboxDeletionEvent.getDeletedMessageCount(), + mailboxDeletionEvent.getTotalDeletedSize())) + .block(); } } diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/StoreCurrentQuotaManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/StoreCurrentQuotaManager.java index d785f89..5d6cb08 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/StoreCurrentQuotaManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/StoreCurrentQuotaManager.java @@ -19,14 +19,15 @@ package org.apache.james.mailbox.store.quota; -import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.model.QuotaOperation; import org.apache.james.mailbox.quota.CurrentQuotaManager; +import reactor.core.publisher.Mono; + public interface StoreCurrentQuotaManager extends CurrentQuotaManager { - void increase(QuotaOperation quotaOperation) throws MailboxException; + Mono<Void> increase(QuotaOperation quotaOperation); - void decrease(QuotaOperation quotaOperation) throws MailboxException; + Mono<Void> decrease(QuotaOperation quotaOperation); } diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/StoreQuotaManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/StoreQuotaManager.java index 82d227d..a731e91 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/StoreQuotaManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/StoreQuotaManager.java @@ -35,6 +35,8 @@ import org.apache.james.mailbox.quota.CurrentQuotaManager; import org.apache.james.mailbox.quota.MaxQuotaManager; import org.apache.james.mailbox.quota.QuotaManager; +import reactor.core.publisher.Mono; + /** * Default implementation for the Quota Manager. * @@ -54,7 +56,7 @@ public class StoreQuotaManager implements QuotaManager { public Quota<QuotaCountLimit, QuotaCountUsage> getMessageQuota(QuotaRoot quotaRoot) throws MailboxException { Map<Scope, QuotaCountLimit> maxMessageDetails = maxQuotaManager.listMaxMessagesDetails(quotaRoot); return Quota.<QuotaCountLimit, QuotaCountUsage>builder() - .used(currentQuotaManager.getCurrentMessageCount(quotaRoot)) + .used(Mono.from(currentQuotaManager.getCurrentMessageCount(quotaRoot)).block()) .computedLimit(maxQuotaManager.getMaxMessage(maxMessageDetails).orElse(QuotaCountLimit.unlimited())) .limitsByScope(maxMessageDetails) .build(); @@ -65,7 +67,7 @@ public class StoreQuotaManager implements QuotaManager { public Quota<QuotaSizeLimit, QuotaSizeUsage> getStorageQuota(QuotaRoot quotaRoot) throws MailboxException { Map<Scope, QuotaSizeLimit> maxStorageDetails = maxQuotaManager.listMaxStorageDetails(quotaRoot); return Quota.<QuotaSizeLimit, QuotaSizeUsage>builder() - .used(currentQuotaManager.getCurrentStorage(quotaRoot)) + .used(Mono.from(currentQuotaManager.getCurrentStorage(quotaRoot)).block()) .computedLimit(maxQuotaManager.getMaxStorage(maxStorageDetails).orElse(QuotaSizeLimit.unlimited())) .limitsByScope(maxStorageDetails) .build(); diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdaterTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdaterTest.java index 6825773..a43ad52 100644 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdaterTest.java +++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdaterTest.java @@ -96,6 +96,7 @@ class ListeningCurrentQuotaUpdaterTest { when(added.getUids()).thenReturn(Lists.newArrayList(MessageUid.of(36), MessageUid.of(38))); when(added.getUsername()).thenReturn(USERNAME_BENWA); when(mockedQuotaRootResolver.getQuotaRoot(eq(MAILBOX_ID))).thenReturn(QUOTA_ROOT); + when(mockedCurrentQuotaManager.increase(QUOTA)).thenAnswer(any -> Mono.empty()); testee.event(added); @@ -111,6 +112,7 @@ class ListeningCurrentQuotaUpdaterTest { when(expunged.getMailboxId()).thenReturn(MAILBOX_ID); when(expunged.getUsername()).thenReturn(USERNAME_BENWA); when(mockedQuotaRootResolver.getQuotaRoot(eq(MAILBOX_ID))).thenReturn(QUOTA_ROOT); + when(mockedCurrentQuotaManager.decrease(QUOTA)).thenAnswer(any -> Mono.empty()); testee.event(expunged); @@ -145,6 +147,8 @@ class ListeningCurrentQuotaUpdaterTest { @Test void mailboxDeletionEventShouldDecreaseCurrentQuotaValues() throws Exception { + QuotaOperation operation = new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(10), QuotaSizeUsage.size(5)); + MailboxListener.MailboxDeletion deletion = mock(MailboxListener.MailboxDeletion.class); when(deletion.getQuotaRoot()).thenReturn(QUOTA_ROOT); when(deletion.getDeletedMessageCount()).thenReturn(QuotaCountUsage.count(10)); @@ -152,10 +156,11 @@ class ListeningCurrentQuotaUpdaterTest { when(deletion.getMailboxId()).thenReturn(MAILBOX_ID); when(deletion.getUsername()).thenReturn(USERNAME_BENWA); when(mockedQuotaRootResolver.getQuotaRoot(eq(MAILBOX_ID))).thenReturn(QUOTA_ROOT); + when(mockedCurrentQuotaManager.decrease(operation)).thenAnswer(any -> Mono.empty()); testee.event(deletion); - verify(mockedCurrentQuotaManager).decrease(new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(10), QuotaSizeUsage.size(5))); + verify(mockedCurrentQuotaManager).decrease(operation); } @Test diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/StoreCurrentQuotaManagerTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/StoreCurrentQuotaManagerTest.java index df39dd2..02701fb 100644 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/StoreCurrentQuotaManagerTest.java +++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/StoreCurrentQuotaManagerTest.java @@ -30,6 +30,8 @@ import org.apache.james.mailbox.model.QuotaRoot; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; + public abstract class StoreCurrentQuotaManagerTest { private static final QuotaRoot QUOTA_ROOT = QuotaRoot.quotaRoot("benwa", Optional.empty()); @@ -44,32 +46,32 @@ public abstract class StoreCurrentQuotaManagerTest { @Test void getCurrentStorageShouldReturnZeroByDefault() throws Exception { - assertThat(testee.getCurrentStorage(QUOTA_ROOT)).isEqualTo(QuotaSizeUsage.size(0)); + assertThat(Mono.from(testee.getCurrentStorage(QUOTA_ROOT)).block()).isEqualTo(QuotaSizeUsage.size(0)); } @Test void increaseShouldWork() throws Exception { - testee.increase(new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(10), QuotaSizeUsage.size(100))); + testee.increase(new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(10), QuotaSizeUsage.size(100))).block(); - assertThat(testee.getCurrentMessageCount(QUOTA_ROOT)).isEqualTo(QuotaCountUsage.count(10)); - assertThat(testee.getCurrentStorage(QUOTA_ROOT)).isEqualTo(QuotaSizeUsage.size(100)); + assertThat(Mono.from(testee.getCurrentMessageCount(QUOTA_ROOT)).block()).isEqualTo(QuotaCountUsage.count(10)); + assertThat(Mono.from(testee.getCurrentStorage(QUOTA_ROOT)).block()).isEqualTo(QuotaSizeUsage.size(100)); } @Test void decreaseShouldWork() throws Exception { - testee.increase(new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(20), QuotaSizeUsage.size(200))); + testee.increase(new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(20), QuotaSizeUsage.size(200))).block(); - testee.decrease(new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(10), QuotaSizeUsage.size(100))); + testee.decrease(new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(10), QuotaSizeUsage.size(100))).block(); - assertThat(testee.getCurrentMessageCount(QUOTA_ROOT)).isEqualTo(QuotaCountUsage.count(10)); - assertThat(testee.getCurrentStorage(QUOTA_ROOT)).isEqualTo(QuotaSizeUsage.size(100)); + assertThat(Mono.from(testee.getCurrentMessageCount(QUOTA_ROOT)).block()).isEqualTo(QuotaCountUsage.count(10)); + assertThat(Mono.from(testee.getCurrentStorage(QUOTA_ROOT)).block()).isEqualTo(QuotaSizeUsage.size(100)); } @Test void decreaseShouldNotFailWhenItLeadsToNegativeValues() throws Exception { - testee.decrease(new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(10), QuotaSizeUsage.size(100))); + testee.decrease(new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(10), QuotaSizeUsage.size(100))).block(); - assertThat(testee.getCurrentMessageCount(QUOTA_ROOT)).isEqualTo(QuotaCountUsage.count(-10)); - assertThat(testee.getCurrentStorage(QUOTA_ROOT)).isEqualTo(QuotaSizeUsage.size(-100)); + assertThat(Mono.from(testee.getCurrentMessageCount(QUOTA_ROOT)).block()).isEqualTo(QuotaCountUsage.count(-10)); + assertThat(Mono.from(testee.getCurrentStorage(QUOTA_ROOT)).block()).isEqualTo(QuotaSizeUsage.size(-100)); } } diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/StoreQuotaManagerTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/StoreQuotaManagerTest.java index fbaba67..205f827 100644 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/StoreQuotaManagerTest.java +++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/StoreQuotaManagerTest.java @@ -38,6 +38,8 @@ import org.apache.james.mailbox.quota.MaxQuotaManager; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; + class StoreQuotaManagerTest { StoreQuotaManager testee; @@ -57,7 +59,9 @@ class StoreQuotaManagerTest { @Test void getMessageQuotaShouldWorkWithNumericValues() throws Exception { when(mockedMaxQuotaManager.getMaxMessage(any(Map.class))).thenReturn(Optional.of(QuotaCountLimit.count(360L))); - when(mockedCurrentQuotaManager.getCurrentMessageCount(quotaRoot)).thenReturn(QuotaCountUsage.count(36L)); + when(mockedCurrentQuotaManager.getCurrentMessageCount(quotaRoot)).thenAnswer(any -> + Mono.fromCallable(() -> QuotaCountUsage.count(36L))); + assertThat(testee.getMessageQuota(quotaRoot)).isEqualTo( Quota.<QuotaCountLimit, QuotaCountUsage>builder().used(QuotaCountUsage.count(36)).computedLimit(QuotaCountLimit.count(360)).build()); } @@ -66,7 +70,9 @@ class StoreQuotaManagerTest { @Test void getStorageQuotaShouldWorkWithNumericValues() throws Exception { when(mockedMaxQuotaManager.getMaxStorage(any(Map.class))).thenReturn(Optional.of(QuotaSizeLimit.size(360L))); - when(mockedCurrentQuotaManager.getCurrentStorage(quotaRoot)).thenReturn(QuotaSizeUsage.size(36L)); + when(mockedCurrentQuotaManager.getCurrentStorage(quotaRoot)).thenAnswer(any -> + Mono.fromCallable(() -> QuotaSizeUsage.size(36L))); + assertThat(testee.getStorageQuota(quotaRoot)).isEqualTo( Quota.<QuotaSizeLimit, QuotaSizeUsage>builder().used(QuotaSizeUsage.size(36)).computedLimit(QuotaSizeLimit.size(360)).build()); } @@ -75,7 +81,8 @@ class StoreQuotaManagerTest { @Test void getStorageQuotaShouldCalculateCurrentQuotaWhenUnlimited() throws Exception { when(mockedMaxQuotaManager.getMaxStorage(any(Map.class))).thenReturn(Optional.of(QuotaSizeLimit.unlimited())); - when(mockedCurrentQuotaManager.getCurrentStorage(quotaRoot)).thenReturn(QuotaSizeUsage.size(36L)); + when(mockedCurrentQuotaManager.getCurrentStorage(quotaRoot)).thenAnswer(any -> + Mono.fromCallable(() -> QuotaSizeUsage.size(36L))); assertThat(testee.getStorageQuota(quotaRoot)).isEqualTo( Quota.<QuotaSizeLimit, QuotaSizeUsage>builder().used(QuotaSizeUsage.size(36)).computedLimit(QuotaSizeLimit.unlimited()).build()); @@ -85,7 +92,8 @@ class StoreQuotaManagerTest { @Test void getMessageQuotaShouldCalculateCurrentQuotaWhenUnlimited() throws Exception { when(mockedMaxQuotaManager.getMaxMessage(any(Map.class))).thenReturn(Optional.of(QuotaCountLimit.unlimited())); - when(mockedCurrentQuotaManager.getCurrentMessageCount(quotaRoot)).thenReturn(QuotaCountUsage.count(36L)); + when(mockedCurrentQuotaManager.getCurrentMessageCount(quotaRoot)).thenAnswer(any -> + Mono.fromCallable(() -> QuotaCountUsage.count(36L))); assertThat(testee.getMessageQuota(quotaRoot)).isEqualTo( Quota.<QuotaCountLimit, QuotaCountUsage>builder().used(QuotaCountUsage.count(36)).computedLimit(QuotaCountLimit.unlimited()).build()); diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserQuotaRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserQuotaRoutesTest.java index e320c5a..d656dcc 100644 --- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserQuotaRoutesTest.java +++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserQuotaRoutesTest.java @@ -852,7 +852,7 @@ class UserQuotaRoutesTest { maxQuotaManager.setMaxStorage(userQuotaRootResolver.forUser(BOB), QuotaSizeLimit.size(80)); maxQuotaManager.setMaxMessage(userQuotaRootResolver.forUser(BOB), QuotaCountLimit.count(100)); - currentQuotaManager.increase(quotaIncrease); + currentQuotaManager.increase(quotaIncrease).block(); JsonPath jsonPath = when() @@ -881,7 +881,7 @@ class UserQuotaRoutesTest { maxQuotaManager.setMaxStorage(userQuotaRootResolver.forUser(BOB), QuotaSizeLimit.unlimited()); maxQuotaManager.setMaxMessage(userQuotaRootResolver.forUser(BOB), QuotaCountLimit.unlimited()); - currentQuotaManager.increase(quotaIncrease); + currentQuotaManager.increase(quotaIncrease).block(); JsonPath jsonPath = when() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
