This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit dac815063fb735c69d2b22e39979ba420eb52cea
Author: Benoit Tellier <[email protected]>
AuthorDate: Thu May 13 15:59:47 2021 +0700

    [REFACTORING] Cassandra implementation should depend on interfaces for 
UidProvider and ModSeqProvider
    
    This enables people to inject their own generation logic, that might rely
    on stronger mechanisms than LWTs.
    
    Candidates might be Atomix, Zookeeper, Consul, Etcd, etc...
---
 .../CassandraMailboxSessionMapperFactory.java         |  7 +++----
 .../cassandra/mail/CassandraMessageIdMapper.java      |  7 ++++---
 .../cassandra/mail/CassandraMessageMapper.java        | 16 +++++++++-------
 .../cassandra/mail/CassandraModSeqProvider.java       | 19 +++++++++----------
 .../mailbox/cassandra/mail/CassandraUidProvider.java  | 11 ++++++++---
 .../cassandra/mail/CassandraModSeqProviderTest.java   |  4 ++--
 .../mailbox/cassandra/mail/utils/GuiceUtils.java      |  3 +++
 .../james/mailbox/store/mail/ModSeqProvider.java      |  6 ++++++
 .../apache/james/mailbox/store/mail/UidProvider.java  | 16 ++++++++++++++++
 9 files changed, 60 insertions(+), 29 deletions(-)

diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
index 671b894..4194924 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
@@ -50,7 +50,6 @@ import 
org.apache.james.mailbox.cassandra.mail.CassandraMessageIdMapper;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageMapper;
 import org.apache.james.mailbox.cassandra.mail.CassandraModSeqProvider;
-import org.apache.james.mailbox.cassandra.mail.CassandraUidProvider;
 import org.apache.james.mailbox.cassandra.mail.CassandraUserMailboxRightsDAO;
 import 
org.apache.james.mailbox.cassandra.mail.task.RecomputeMailboxCountersService;
 import org.apache.james.mailbox.cassandra.user.CassandraSubscriptionMapper;
@@ -73,8 +72,8 @@ public class CassandraMailboxSessionMapperFactory extends 
MailboxSessionMapperFa
     protected static final String ATTACHMENTMAPPER = "ATTACHMENTMAPPER";
 
     private final Session session;
-    private final CassandraUidProvider uidProvider;
-    private final CassandraModSeqProvider modSeqProvider;
+    private final UidProvider uidProvider;
+    private final ModSeqProvider modSeqProvider;
     private final CassandraMessageDAO messageDAO;
     private final CassandraMessageDAOV3 messageDAOV3;
     private final CassandraMessageIdDAO messageIdDAO;
@@ -101,7 +100,7 @@ public class CassandraMailboxSessionMapperFactory extends 
MailboxSessionMapperFa
     private final CassandraConfiguration cassandraConfiguration;
 
     @Inject
-    public CassandraMailboxSessionMapperFactory(CassandraUidProvider 
uidProvider, CassandraModSeqProvider modSeqProvider, Session session,
+    public CassandraMailboxSessionMapperFactory(UidProvider uidProvider, 
CassandraModSeqProvider modSeqProvider, Session session,
                                                 CassandraMessageDAO messageDAO,
                                                 CassandraMessageDAOV3 
messageDAOV3, CassandraMessageIdDAO messageIdDAO, 
CassandraMessageIdToImapUidDAO imapUidDAO,
                                                 CassandraMailboxCounterDAO 
mailboxCounterDAO, CassandraMailboxRecentsDAO mailboxRecentsDAO, 
CassandraMailboxDAO mailboxDAO,
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 9ac0b67..84407d8 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -49,6 +49,7 @@ import org.apache.james.mailbox.store.MailboxReactorUtils;
 import org.apache.james.mailbox.store.mail.MailboxMapper;
 import org.apache.james.mailbox.store.mail.MessageIdMapper;
 import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
+import org.apache.james.mailbox.store.mail.ModSeqProvider;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.util.FunctionalUtils;
 import org.apache.james.util.ReactorUtils;
@@ -79,14 +80,14 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
     private final CassandraMessageDAO messageDAO;
     private final CassandraMessageDAOV3 messageDAOV3;
     private final CassandraIndexTableHandler indexTableHandler;
-    private final CassandraModSeqProvider modSeqProvider;
+    private final ModSeqProvider modSeqProvider;
     private final AttachmentLoader attachmentLoader;
     private final CassandraConfiguration cassandraConfiguration;
 
     public CassandraMessageIdMapper(MailboxMapper mailboxMapper, 
CassandraMailboxDAO mailboxDAO, CassandraAttachmentMapper attachmentMapper,
                                     CassandraMessageIdToImapUidDAO imapUidDAO, 
CassandraMessageIdDAO messageIdDAO,
                                     CassandraMessageDAO messageDAO, 
CassandraMessageDAOV3 messageDAOV3, CassandraIndexTableHandler 
indexTableHandler,
-                                    CassandraModSeqProvider modSeqProvider, 
CassandraConfiguration cassandraConfiguration) {
+                                    ModSeqProvider modSeqProvider, 
CassandraConfiguration cassandraConfiguration) {
 
         this.mailboxMapper = mailboxMapper;
         this.mailboxDAO = mailboxDAO;
@@ -308,7 +309,7 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
         if (identicalFlags(oldComposedId, newFlags)) {
             return Mono.just(Pair.of(oldComposedId.getFlags(), oldComposedId));
         } else {
-            return modSeqProvider.nextModSeq(cassandraId)
+            return modSeqProvider.nextModSeqReactive(cassandraId)
                 .map(modSeq -> new ComposedMessageIdWithMetaData(
                     oldComposedId.getComposedMessageId(),
                     newFlags,
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index f929b83..2a1f903 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -58,6 +58,8 @@ import org.apache.james.mailbox.model.MessageRange;
 import org.apache.james.mailbox.model.UpdatedFlags;
 import org.apache.james.mailbox.store.FlagsUpdateCalculator;
 import org.apache.james.mailbox.store.mail.MessageMapper;
+import org.apache.james.mailbox.store.mail.ModSeqProvider;
+import org.apache.james.mailbox.store.mail.UidProvider;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
 import org.apache.james.task.Task;
@@ -82,8 +84,8 @@ public class CassandraMessageMapper implements MessageMapper {
     private static final Duration MIN_RETRY_BACKOFF = Duration.ofMillis(10);
     private static final Duration MAX_RETRY_BACKOFF = Duration.ofMillis(1000);
 
-    private final CassandraModSeqProvider modSeqProvider;
-    private final CassandraUidProvider uidProvider;
+    private final ModSeqProvider modSeqProvider;
+    private final UidProvider uidProvider;
     private final CassandraMessageDAO messageDAO;
     private final CassandraMessageDAOV3 messageDAOV3;
     private final CassandraMessageIdDAO messageIdDAO;
@@ -99,7 +101,7 @@ public class CassandraMessageMapper implements MessageMapper 
{
     private final RecomputeMailboxCountersService 
recomputeMailboxCountersService;
     private final SecureRandom secureRandom;
 
-    public CassandraMessageMapper(CassandraUidProvider uidProvider, 
CassandraModSeqProvider modSeqProvider,
+    public CassandraMessageMapper(UidProvider uidProvider, ModSeqProvider 
modSeqProvider,
                                   CassandraAttachmentMapper attachmentMapper,
                                   CassandraMessageDAO messageDAO, 
CassandraMessageDAOV3 messageDAOV3, CassandraMessageIdDAO messageIdDAO,
                                   CassandraMessageIdToImapUidDAO imapUidDAO, 
CassandraMailboxCounterDAO mailboxCounterDAO,
@@ -343,10 +345,10 @@ public class CassandraMessageMapper implements 
MessageMapper {
 
     private Mono<MailboxMessage> addUidAndModseq(MailboxMessage message, 
CassandraId mailboxId) {
         Mono<MessageUid> messageUidMono = uidProvider
-            .nextUids(mailboxId)
+            .nextUidReactive(mailboxId)
             .switchIfEmpty(Mono.error(() -> new MailboxException("Can not find 
a UID to save " + message.getMessageId() + " in " + mailboxId)));
 
-        Mono<ModSeq> nextModSeqMono = modSeqProvider.nextModSeq(mailboxId)
+        Mono<ModSeq> nextModSeqMono = 
modSeqProvider.nextModSeqReactive(mailboxId)
             .switchIfEmpty(Mono.error(() -> new MailboxException("Can not find 
a MODSEQ to save " + message.getMessageId() + " in " + mailboxId)));
 
         return Mono.zip(messageUidMono, nextModSeqMono)
@@ -438,7 +440,7 @@ public class CassandraMessageMapper implements 
MessageMapper {
     }
 
     private Mono<ModSeq> computeNewModSeq(CassandraId mailboxId) {
-        return modSeqProvider.nextModSeq(mailboxId)
+        return modSeqProvider.nextModSeqReactive(mailboxId)
             .switchIfEmpty(ReactorUtils.executeAndEmpty(() -> new 
RuntimeException("ModSeq generation failed for mailbox " + 
mailboxId.asUuid())));
     }
 
@@ -492,7 +494,7 @@ public class CassandraMessageMapper implements 
MessageMapper {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
 
         Mono<List<MessageUid>> uids = uidProvider.nextUids(mailboxId, 
messages.size());
-        Mono<ModSeq> nextModSeq = modSeqProvider.nextModSeq(mailboxId);
+        Mono<ModSeq> nextModSeq = modSeqProvider.nextModSeqReactive(mailboxId);
 
         Mono<List<MailboxMessage>> messagesWithUidAndModSeq = 
nextModSeq.flatMap(modSeq -> uids.map(uidList -> Pair.of(uidList, modSeq)))
             .map(pair -> pair.getKey().stream()
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
index e41df05..65896d2 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
@@ -121,19 +121,16 @@ public class CassandraModSeqProvider implements 
ModSeqProvider {
             .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID))));
     }
 
-
-
     @Override
     public ModSeq nextModSeq(Mailbox mailbox) throws MailboxException {
-        CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
-        return nextModSeq(mailboxId)
+        return nextModSeqReactive(mailbox.getMailboxId())
             .blockOptional()
-            .orElseThrow(() -> new MailboxException("Can not retrieve modseq 
for " + mailboxId));
+            .orElseThrow(() -> new MailboxException("Can not retrieve modseq 
for " + mailbox.getMailboxId()));
     }
 
     @Override
     public ModSeq nextModSeq(MailboxId mailboxId) throws MailboxException {
-        return nextModSeq((CassandraId) mailboxId)
+        return nextModSeqReactive(mailboxId)
             .blockOptional()
             .orElseThrow(() -> new MailboxException("Can not retrieve modseq 
for " + mailboxId));
     }
@@ -184,13 +181,15 @@ public class CassandraModSeqProvider implements 
ModSeqProvider {
         return Optional.empty();
     }
 
-    public Mono<ModSeq> nextModSeq(CassandraId mailboxId) {
+    @Override
+    public Mono<ModSeq> nextModSeqReactive(MailboxId mailboxId) {
+        CassandraId cassandraId = (CassandraId) mailboxId;
         Duration firstBackoff = Duration.ofMillis(10);
 
-        return findHighestModSeq(mailboxId)
+        return findHighestModSeq(cassandraId)
             .flatMap(maybeHighestModSeq -> maybeHighestModSeq
-                        .map(highestModSeq -> tryUpdateModSeq(mailboxId, 
highestModSeq))
-                        .orElseGet(() -> tryInsertModSeq(mailboxId, 
ModSeq.first())))
+                        .map(highestModSeq -> tryUpdateModSeq(cassandraId, 
highestModSeq))
+                        .orElseGet(() -> tryInsertModSeq(cassandraId, 
ModSeq.first())))
             .single()
             .retryWhen(Retry.backoff(maxModSeqRetries, 
firstBackoff).scheduler(Schedulers.elastic()));
     }
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
index afc7111..877c15c 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
@@ -105,12 +105,14 @@ public class CassandraUidProvider implements UidProvider {
     @Override
     public MessageUid nextUid(MailboxId mailboxId) throws MailboxException {
         CassandraId cassandraId = (CassandraId) mailboxId;
-        return nextUids(cassandraId)
+        return nextUidReactive(cassandraId)
             .blockOptional()
             .orElseThrow(() -> new MailboxException("Error during Uid 
update"));
     }
 
-    public Mono<MessageUid> nextUids(CassandraId cassandraId) {
+    @Override
+    public Mono<MessageUid> nextUidReactive(MailboxId mailboxId) {
+        CassandraId cassandraId = (CassandraId) mailboxId;
         Mono<MessageUid> updateUid = findHighestUid(cassandraId)
             .flatMap(messageUid -> tryUpdateUid(cassandraId, messageUid));
 
@@ -122,7 +124,10 @@ public class CassandraUidProvider implements UidProvider {
             .retryWhen(Retry.backoff(maxUidRetries, 
firstBackoff).scheduler(Schedulers.elastic()));
     }
 
-    public Mono<List<MessageUid>> nextUids(CassandraId cassandraId, int count) 
{
+    @Override
+    public Mono<List<MessageUid>> nextUids(MailboxId mailboxId, int count) {
+        CassandraId cassandraId = (CassandraId) mailboxId;
+
         Mono<List<MessageUid>> updateUid = findHighestUid(cassandraId)
             .flatMap(messageUid -> tryUpdateUid(cassandraId, messageUid, count)
                 .map(highest -> range(messageUid, highest)));
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java
index 8773221..8ad2b1e 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProviderTest.java
@@ -115,10 +115,10 @@ class CassandraModSeqProviderTest {
                     .times(1)
                     .whenQueryStartsWith("SELECT nextModseq FROM modseq WHERE 
mailboxId=:mailboxId;"));
 
-        CompletableFuture<ModSeq> operation1 = 
modSeqProvider.nextModSeq(CASSANDRA_ID)
+        CompletableFuture<ModSeq> operation1 = 
modSeqProvider.nextModSeqReactive(CASSANDRA_ID)
             .subscribeOn(Schedulers.elastic())
             .toFuture();
-        CompletableFuture<ModSeq> operation2 = 
modSeqProvider.nextModSeq(CASSANDRA_ID)
+        CompletableFuture<ModSeq> operation2 = 
modSeqProvider.nextModSeqReactive(CASSANDRA_ID)
             .subscribeOn(Schedulers.elastic())
             .toFuture();
 
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/GuiceUtils.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/GuiceUtils.java
index 1c6481a..c1c9ba1 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/GuiceUtils.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/GuiceUtils.java
@@ -38,8 +38,10 @@ import 
org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
 import org.apache.james.json.DTO;
 import org.apache.james.json.DTOModule;
 import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
+import org.apache.james.mailbox.cassandra.mail.CassandraUidProvider;
 import org.apache.james.mailbox.cassandra.mail.eventsourcing.acl.ACLModule;
 import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.store.mail.UidProvider;
 
 import com.datastax.driver.core.Session;
 import com.google.common.collect.ImmutableSet;
@@ -73,6 +75,7 @@ public class GuiceUtils {
                                         CassandraConfiguration configuration) {
         return Modules.combine(
             binder -> 
binder.bind(MessageId.Factory.class).toInstance(messageIdFactory),
+            binder -> 
binder.bind(UidProvider.class).to(CassandraUidProvider.class),
             binder -> binder.bind(BlobId.Factory.class).toInstance(new 
HashBlobId.Factory()),
             binder -> binder.bind(BlobStore.class).toProvider(() -> 
CassandraBlobStoreFactory.forTesting(session).passthrough()),
             binder -> binder.bind(Session.class).toInstance(session),
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/ModSeqProvider.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/ModSeqProvider.java
index 7c96c74..a363401 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/ModSeqProvider.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/ModSeqProvider.java
@@ -23,6 +23,8 @@ import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.Mailbox;
 import org.apache.james.mailbox.model.MailboxId;
 
+import reactor.core.publisher.Mono;
+
 /**
  * Take care of provide mod-seqences for a given {@link Mailbox}. Be aware 
that implementations
  * need to be thread-safe!
@@ -56,4 +58,8 @@ public interface ModSeqProvider {
      * Return the highest mod-sequence which were used for the {@link Mailbox}
      */
     ModSeq highestModSeq(MailboxId mailboxId) throws MailboxException;
+
+    default Mono<ModSeq> nextModSeqReactive(MailboxId mailboxId) {
+        return Mono.fromCallable(() -> nextModSeq(mailboxId));
+    }
 }
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/UidProvider.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/UidProvider.java
index 57c3f8c..d4e6b38 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/UidProvider.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/UidProvider.java
@@ -18,6 +18,7 @@
  ****************************************************************/
 package org.apache.james.mailbox.store.mail;
 
+import java.util.List;
 import java.util.Optional;
 
 import org.apache.james.mailbox.MessageUid;
@@ -25,6 +26,11 @@ import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.Mailbox;
 import org.apache.james.mailbox.model.MailboxId;
 
+import com.github.steveash.guavate.Guavate;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
 /**
  * Take care of provide uids for a given {@link Mailbox}. Be aware that 
implementations
  * need to be thread-safe!
@@ -44,4 +50,14 @@ public interface UidProvider {
     Optional<MessageUid> lastUid(Mailbox mailbox) throws MailboxException;
 
     MessageUid nextUid(MailboxId mailboxId) throws MailboxException;
+
+    default Mono<MessageUid> nextUidReactive(MailboxId mailboxId) {
+        return Mono.fromCallable(() -> nextUid(mailboxId));
+    }
+
+    default Mono<List<MessageUid>> nextUids(MailboxId mailboxId, int count) {
+        return Flux.range(0, count)
+            .flatMap(i -> nextUidReactive(mailboxId))
+            .collect(Guavate.toImmutableList());
+    }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to