This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new dad460732c JAMES-3516 Rely on Cassandra for Thread lookups (#2750)
dad460732c is described below

commit dad460732c3ff5bfcb98904051f4c863c84899cb
Author: Benoit TELLIER <btell...@linagora.com>
AuthorDate: Fri Jun 27 15:22:00 2025 +0200

    JAMES-3516 Rely on Cassandra for Thread lookups (#2750)
    
    - Reorganise the lookup DAO in order to include the ThreadId as
     part of the primary key and relegate the messageId as a clustering
     key. Acceptable for our usage pattern, this enables listing content of
     a thread.
    
     Migration strategy: none. Truncate the old v2 table and start with
     an empty v3 table which is acceptable.
    
      - Rely on this method for thread resolution instead of the search index
    
      - Pay an attention that messages in the tread are accessible to
      the end user (right resolution)
---
 .../CassandraThreadIdGuessingAlgorithm.java        | 43 +++++++++++++---------
 .../mailbox/cassandra/DeleteMessageListener.java   | 33 +++++++----------
 .../mailbox/cassandra/ids/CassandraMessageId.java  |  4 ++
 .../cassandra/mail/CassandraThreadLookupDAO.java   | 35 +++++++++++++++---
 .../modules/CassandraThreadDataDefinition.java     |  3 +-
 .../table/CassandraThreadLookupTable.java          |  2 +-
 .../cassandra/CassandraMailboxManagerTest.java     | 18 ++++-----
 .../CassandraThreadIdGuessingAlgorithmTest.java    |  6 ++-
 .../mail/CassandraThreadLookupDAOTest.java         | 43 +++++++++++++++-------
 9 files changed, 119 insertions(+), 68 deletions(-)

diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithm.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithm.java
index 812191c7cd..abea0e7884 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithm.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithm.java
@@ -19,23 +19,26 @@
 
 package org.apache.james.mailbox.cassandra;
 
+import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 import jakarta.inject.Inject;
 
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.james.mailbox.MailboxManager;
 import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageIdManager;
 import org.apache.james.mailbox.cassandra.mail.CassandraThreadDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraThreadLookupDAO;
 import org.apache.james.mailbox.exception.ThreadNotFoundException;
+import org.apache.james.mailbox.model.FetchGroup;
 import org.apache.james.mailbox.model.MessageId;
-import org.apache.james.mailbox.model.MultimailboxesSearchQuery;
-import org.apache.james.mailbox.model.SearchQuery;
+import org.apache.james.mailbox.model.MessageResult;
 import org.apache.james.mailbox.model.ThreadId;
 import org.apache.james.mailbox.store.mail.ThreadIdGuessingAlgorithm;
 import org.apache.james.mailbox.store.mail.model.MimeMessageId;
@@ -46,17 +49,18 @@ import com.google.common.hash.Hashing;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.publisher.SynchronousSink;
 
 public class CassandraThreadIdGuessingAlgorithm implements 
ThreadIdGuessingAlgorithm {
     private static final boolean DISABLE_THREADS = 
Boolean.valueOf(System.getProperty("james.mailbox.threads.disable", "false"));
 
-    private final MailboxManager mailboxManager;
+    private final MessageIdManager messageIdManager;
     private final CassandraThreadDAO threadDAO;
     private final CassandraThreadLookupDAO threadLookupDAO;
 
     @Inject
-    public CassandraThreadIdGuessingAlgorithm(MailboxManager mailboxManager, 
CassandraThreadDAO threadDAO, CassandraThreadLookupDAO threadLookupDAO) {
-        this.mailboxManager = mailboxManager;
+    public CassandraThreadIdGuessingAlgorithm(MessageIdManager 
messageIdManager, CassandraThreadDAO threadDAO, CassandraThreadLookupDAO 
threadLookupDAO) {
+        this.messageIdManager = messageIdManager;
         this.threadDAO = threadDAO;
         this.threadLookupDAO = threadLookupDAO;
     }
@@ -78,7 +82,7 @@ public class CassandraThreadIdGuessingAlgorithm implements 
ThreadIdGuessingAlgor
             .switchIfEmpty(Mono.just(ThreadId.fromBaseMessageId(messageId)))
             .flatMap(threadId -> threadDAO
                 .insertSome(session.getUser(), hashMimeMessageIds, messageId, 
threadId, hashBaseSubject)
-                .then(threadLookupDAO.insert(messageId, session.getUser(), 
hashMimeMessageIds))
+                .then(threadLookupDAO.insert(messageId, threadId, 
session.getUser(), hashMimeMessageIds))
                 .then(Mono.just(threadId)));
     }
 
@@ -88,16 +92,21 @@ public class CassandraThreadIdGuessingAlgorithm implements 
ThreadIdGuessingAlgor
             return Flux.just(threadId.getBaseMessageId());
         }
 
-        SearchQuery searchQuery = SearchQuery.builder()
-            .andCriteria(SearchQuery.threadId(threadId))
-            .sorts(new SearchQuery.Sort(SearchQuery.Sort.SortClause.Arrival, 
SearchQuery.Sort.Order.NATURAL))
-            .build();
-
-        MultimailboxesSearchQuery expression = MultimailboxesSearchQuery
-            .from(searchQuery)
-            .build();
-
-        return Flux.from(mailboxManager.search(expression, session, 
Integer.MAX_VALUE))
+        BiConsumer<Collection<MessageId>, 
SynchronousSink<Collection<MessageId>>> throwIfEmpty = (ids, sink) -> {
+            // Handle umpopulated lookup dao
+            if (ids.isEmpty()) {
+                sink.error(new ThreadNotFoundException(threadId));
+            } else {
+                sink.next(ids);
+            }
+        };
+        return threadLookupDAO.selectAll(threadId)
+            .collectList()
+            .handle(throwIfEmpty)
+            .flatMapMany(messageIds -> 
Flux.from(messageIdManager.getMessagesReactive(messageIds, FetchGroup.MINIMAL, 
session)))
+            
.sort(Comparator.comparing(MessageResult::getInternalDate).thenComparing(m -> 
m.getMessageId().serialize()))
+            .map(MessageResult::getMessageId)
+            .distinct()
             .switchIfEmpty(Mono.error(() -> new 
ThreadNotFoundException(threadId)));
     }
 
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
index e01cc966d1..7e0a113607 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
@@ -50,20 +50,18 @@ import 
org.apache.james.mailbox.cassandra.mail.CassandraMailboxRecentsDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV3;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO;
-import org.apache.james.mailbox.cassandra.mail.CassandraMessageMetadata;
 import org.apache.james.mailbox.cassandra.mail.CassandraThreadDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraThreadLookupDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraUserMailboxRightsDAO;
 import org.apache.james.mailbox.cassandra.mail.MessageRepresentation;
 import org.apache.james.mailbox.events.MailboxEvents.Expunged;
 import org.apache.james.mailbox.events.MailboxEvents.MailboxDeletion;
-import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
 import org.apache.james.mailbox.model.MailboxACL;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MailboxPath;
 import org.apache.james.mailbox.model.MessageId;
-import org.apache.james.mailbox.model.MessageMetaData;
 import org.apache.james.mailbox.model.MessageRange;
+import org.apache.james.mailbox.model.ThreadId;
 import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.util.streams.Limit;
 import org.reactivestreams.Publisher;
@@ -232,11 +230,10 @@ public class DeleteMessageListener implements 
EventListener.ReactiveGroupEventLi
         int prefetch = 1;
         return Flux.mergeDelayError(prefetch,
                 messageIdDAO.retrieveMessages(mailboxId, MessageRange.all(), 
Limit.unlimited())
-                    .map(CassandraMessageMetadata::getComposedMessageId)
-                    .map(ComposedMessageIdWithMetaData::getComposedMessageId)
-                    .concatMap(metadata -> 
handleMessageDeletionAsPartOfMailboxDeletion((CassandraMessageId) 
metadata.getMessageId(), mailboxId, path.getUser())
-                        .then(imapUidDAO.delete((CassandraMessageId) 
metadata.getMessageId(), mailboxId))
-                        .then(messageIdDAO.delete(mailboxId, 
metadata.getUid()))),
+                    .concatMap(metadata -> 
handleMessageDeletionAsPartOfMailboxDeletion((CassandraMessageId) 
metadata.getComposedMessageId().getComposedMessageId().getMessageId(),
+                            metadata.getComposedMessageId().getThreadId(), 
mailboxId, path.getUser())
+                        .then(imapUidDAO.delete((CassandraMessageId) 
metadata.getComposedMessageId().getComposedMessageId().getMessageId(), 
mailboxId))
+                        .then(messageIdDAO.delete(mailboxId, 
metadata.getComposedMessageId().getComposedMessageId().getUid()))),
                 deleteAcl(mailboxId),
                 applicableFlagDAO.delete(mailboxId),
                 firstUnseenDAO.removeAll(mailboxId),
@@ -247,11 +244,9 @@ public class DeleteMessageListener implements 
EventListener.ReactiveGroupEventLi
     }
 
     private Mono<Void> handleMessageDeletion(Expunged expunged) {
-        return Flux.fromIterable(expunged.getExpunged()
-            .values())
-            .map(MessageMetaData::getMessageId)
-            .map(CassandraMessageId.class::cast)
-            .concatMap(messageId -> handleMessageDeletion(messageId, 
expunged.getMailboxId(), expunged.getMailboxPath().getUser()))
+        return Flux.fromIterable(expunged.getExpunged().values())
+            .concatMap(metaData -> handleMessageDeletion((CassandraMessageId) 
metaData.getMessageId(),
+                expunged.getMailboxId(), metaData.getThreadId(), 
expunged.getMailboxPath().getUser()))
             .then();
     }
 
@@ -261,7 +256,7 @@ public class DeleteMessageListener implements 
EventListener.ReactiveGroupEventLi
                 .then(aclMapper.delete(mailboxId)));
     }
 
-    private Mono<Void> handleMessageDeletion(CassandraMessageId messageId, 
MailboxId mailboxId, Username owner) {
+    private Mono<Void> handleMessageDeletion(CassandraMessageId messageId, 
MailboxId mailboxId, ThreadId threadId, Username owner) {
         return Mono.just(messageId)
             .filterWhen(this::isReferenced)
             .flatMap(id -> readMessage(id)
@@ -269,13 +264,13 @@ public class DeleteMessageListener implements 
EventListener.ReactiveGroupEventLi
                 .flatMap(message -> 
deleteUnreferencedAttachments(message).thenReturn(message))
                 .flatMap(this::deleteMessageBlobs)
                 .then(messageDAOV3.delete(messageId))
-                .then(threadLookupDAO.selectOneRow(messageId)
+                .then(threadLookupDAO.selectOneRow(threadId, messageId)
                     .flatMap(key -> threadDAO.deleteSome(key.getUsername(), 
key.getMimeMessageIds())
                         .collectList()))
-                .then(threadLookupDAO.deleteOneRow(messageId)));
+                .then(threadLookupDAO.deleteOneRow(threadId, messageId)));
     }
 
-    private Mono<Void> 
handleMessageDeletionAsPartOfMailboxDeletion(CassandraMessageId messageId, 
CassandraId excludedId, Username owner) {
+    private Mono<Void> 
handleMessageDeletionAsPartOfMailboxDeletion(CassandraMessageId messageId, 
ThreadId threadId, CassandraId excludedId, Username owner) {
         return Mono.just(messageId)
             .filterWhen(id -> isReferenced(id, excludedId))
             .flatMap(id -> readMessage(id)
@@ -283,10 +278,10 @@ public class DeleteMessageListener implements 
EventListener.ReactiveGroupEventLi
                 .flatMap(message -> 
deleteUnreferencedAttachments(message).thenReturn(message))
                 .flatMap(this::deleteMessageBlobs)
                 .then(messageDAOV3.delete(messageId))
-                .then(threadLookupDAO.selectOneRow(messageId)
+                .then(threadLookupDAO.selectOneRow(threadId, messageId)
                     .flatMap(key -> threadDAO.deleteSome(key.getUsername(), 
key.getMimeMessageIds())
                         .collectList()))
-                .then(threadLookupDAO.deleteOneRow(messageId)));
+                .then(threadLookupDAO.deleteOneRow(threadId, messageId)));
     }
 
     private Mono<MessageRepresentation> 
deleteMessageBlobs(MessageRepresentation message) {
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/ids/CassandraMessageId.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/ids/CassandraMessageId.java
index cdfafa706a..9c9913305a 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/ids/CassandraMessageId.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/ids/CassandraMessageId.java
@@ -46,6 +46,10 @@ public class CassandraMessageId implements MessageId {
         }
     }
 
+    public static CassandraMessageId of(UUID uuid) {
+        return new CassandraMessageId(uuid);
+    }
+
     private final UUID uuid;
 
     private CassandraMessageId(UUID uuid) {
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAO.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAO.java
index f910c928f8..a3ba779a08 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAO.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAO.java
@@ -30,6 +30,7 @@ import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageIds.MESSA
 import static 
org.apache.james.mailbox.cassandra.table.CassandraThreadLookupTable.MIME_MESSAGE_IDS;
 import static 
org.apache.james.mailbox.cassandra.table.CassandraThreadLookupTable.TABLE_NAME;
 import static 
org.apache.james.mailbox.cassandra.table.CassandraThreadTable.USERNAME;
+import static 
org.apache.james.mailbox.cassandra.table.MessageIdToImapUid.THREAD_ID;
 
 import java.util.Set;
 
@@ -39,6 +40,7 @@ import 
org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.core.Username;
 import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
 import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.model.ThreadId;
 
 import com.datastax.oss.driver.api.core.CqlSession;
 import com.datastax.oss.driver.api.core.cql.PreparedStatement;
@@ -48,6 +50,7 @@ import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
 import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
 import com.google.common.collect.ImmutableSet;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class CassandraThreadLookupDAO {
@@ -56,6 +59,7 @@ public class CassandraThreadLookupDAO {
     private final CassandraAsyncExecutor executor;
     private final PreparedStatement insert;
     private final PreparedStatement select;
+    private final PreparedStatement selectAll;
     private final PreparedStatement delete;
 
     @Inject
@@ -64,35 +68,54 @@ public class CassandraThreadLookupDAO {
 
         insert = session.prepare(insertInto(TABLE_NAME)
             .value(MESSAGE_ID, bindMarker(MESSAGE_ID))
+            .value(THREAD_ID, bindMarker(THREAD_ID))
             .value(USERNAME, bindMarker(USERNAME))
             .value(MIME_MESSAGE_IDS, bindMarker(MIME_MESSAGE_IDS))
             .build());
 
         select = session.prepare(selectFrom(TABLE_NAME)
             .columns(USERNAME, MIME_MESSAGE_IDS)
-            .where(column(MESSAGE_ID).isEqualTo(bindMarker(MESSAGE_ID)))
+            .where(column(MESSAGE_ID).isEqualTo(bindMarker(MESSAGE_ID)),
+                column(THREAD_ID).isEqualTo(bindMarker(THREAD_ID)))
+            .build());
+
+        selectAll = session.prepare(selectFrom(TABLE_NAME)
+            .columns(MESSAGE_ID)
+            .where(column(THREAD_ID).isEqualTo(bindMarker(THREAD_ID)))
             .build());
 
         delete = session.prepare(deleteFrom(TABLE_NAME)
-            .where(column(MESSAGE_ID).isEqualTo(bindMarker(MESSAGE_ID)))
+            .where(column(MESSAGE_ID).isEqualTo(bindMarker(MESSAGE_ID)),
+                column(THREAD_ID).isEqualTo(bindMarker(THREAD_ID)))
             .build());
     }
 
-    public Mono<Void> insert(MessageId messageId, Username username, 
Set<Integer> hashMimeMessageIds) {
+    public Mono<Void> insert(MessageId messageId, ThreadId threadId, Username 
username, Set<Integer> hashMimeMessageIds) {
         return executor.executeVoid(insert.bind()
             .set(MESSAGE_ID, ((CassandraMessageId) messageId).get(), 
TypeCodecs.TIMEUUID)
+            .set(THREAD_ID, ((CassandraMessageId) 
threadId.getBaseMessageId()).get(), TypeCodecs.TIMEUUID)
             .set(USERNAME, username.asString(), TypeCodecs.TEXT)
             .set(MIME_MESSAGE_IDS, hashMimeMessageIds, SET_OF_INTS_CODEC));
     }
 
-    public Mono<ThreadTablePartitionKey> selectOneRow(MessageId messageId) {
+    public Flux<MessageId> selectAll(ThreadId threadId) {
+        return executor.executeRows(
+                selectAll.bind()
+                    .set(THREAD_ID, ((CassandraMessageId) 
threadId.getBaseMessageId()).get(), TypeCodecs.TIMEUUID))
+            .map(row -> CassandraMessageId.of(row.get(MESSAGE_ID, 
TypeCodecs.TIMEUUID)));
+    }
+
+    public Mono<ThreadTablePartitionKey> selectOneRow(ThreadId threadId, 
MessageId messageId) {
         return executor.executeSingleRow(
-                select.bind().set(MESSAGE_ID, ((CassandraMessageId) 
messageId).get(), TypeCodecs.TIMEUUID))
+                select.bind()
+                    .set(THREAD_ID, ((CassandraMessageId) 
threadId.getBaseMessageId()).get(), TypeCodecs.TIMEUUID)
+                    .set(MESSAGE_ID, ((CassandraMessageId) messageId).get(), 
TypeCodecs.TIMEUUID))
             .map(this::readRow);
     }
 
-    public Mono<Void> deleteOneRow(MessageId messageId) {
+    public Mono<Void> deleteOneRow(ThreadId threadId, MessageId messageId) {
         return executor.executeVoid(delete.bind()
+            .set(THREAD_ID, ((CassandraMessageId) 
threadId.getBaseMessageId()).get(), TypeCodecs.TIMEUUID)
             .set(MESSAGE_ID, ((CassandraMessageId) messageId).get(), 
TypeCodecs.TIMEUUID));
     }
 
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraThreadDataDefinition.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraThreadDataDefinition.java
index a0efdeae01..48ac6350e6 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraThreadDataDefinition.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraThreadDataDefinition.java
@@ -47,7 +47,8 @@ public interface CassandraThreadDataDefinition {
         .table(CassandraThreadLookupTable.TABLE_NAME)
         .comment("Thread table lookup by messageId, using for deletion thread 
data")
         .statement(statement -> types -> statement
-            .withPartitionKey(MESSAGE_ID, TIMEUUID)
+            .withPartitionKey(THREAD_ID, TIMEUUID)
+            .withClusteringColumn(MESSAGE_ID, TIMEUUID)
             .withColumn(USERNAME, TEXT)
             .withColumn(MIME_MESSAGE_IDS, frozenSetOf(INT)))
         .build();
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraThreadLookupTable.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraThreadLookupTable.java
index 2d8f734161..3389bc1a2c 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraThreadLookupTable.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraThreadLookupTable.java
@@ -22,7 +22,7 @@ package org.apache.james.mailbox.cassandra.table;
 import com.datastax.oss.driver.api.core.CqlIdentifier;
 
 public interface CassandraThreadLookupTable {
-    String TABLE_NAME = "thread_lookup_2";
+    String TABLE_NAME = "thread_lookup_3";
 
     CqlIdentifier MIME_MESSAGE_IDS = CqlIdentifier.fromCql("mimeMessageIds");
 }
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java
index ee8bce8a78..34972330f2 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java
@@ -653,7 +653,7 @@ public class CassandraMailboxManagerTest extends 
MailboxManagerTest<CassandraMai
             saveThreadData(session.getUser(), mimeMessageIds, 
message.getId().getMessageId(), message.getThreadId(), Optional.of(new 
Subject("Test"))).block();
             CassandraMessageId cassandraMessageId = (CassandraMessageId) 
message.getId().getMessageId();
             ThreadTablePartitionKey partitionKey = 
threadLookupDAO(cassandraCluster)
-                .selectOneRow(cassandraMessageId).block();
+                .selectOneRow(message.getThreadId(), 
cassandraMessageId).block();
 
             mailboxManager.deleteMailbox(inbox, session);
 
@@ -663,7 +663,7 @@ public class CassandraMailboxManagerTest extends 
MailboxManagerTest<CassandraMai
                     .isEmpty();
 
                 softly.assertThat(threadLookupDAO(cassandraCluster)
-                    .selectOneRow(cassandraMessageId).block())
+                    .selectOneRow(message.getThreadId(), 
cassandraMessageId).block())
                     .isNull();
             });
         }
@@ -685,7 +685,7 @@ public class CassandraMailboxManagerTest extends 
MailboxManagerTest<CassandraMai
             saveThreadData(session.getUser(), mimeMessageIds, 
message.getId().getMessageId(), message.getThreadId(), Optional.of(new 
Subject("Test"))).block();
             CassandraMessageId cassandraMessageId = (CassandraMessageId) 
message.getId().getMessageId();
             ThreadTablePartitionKey partitionKey = 
threadLookupDAO(cassandraCluster)
-                .selectOneRow(cassandraMessageId).block();
+                .selectOneRow(message.getThreadId(), 
cassandraMessageId).block();
 
             cassandraCluster.getConf().registerScenario(fail()
                 .times(1)
@@ -702,7 +702,7 @@ public class CassandraMailboxManagerTest extends 
MailboxManagerTest<CassandraMai
                     .isEmpty();
 
                 softly.assertThat(threadLookupDAO(cassandraCluster)
-                        .selectOneRow(cassandraMessageId).block())
+                        .selectOneRow(message.getThreadId(), 
cassandraMessageId).block())
                     .isNull();
             });
         }
@@ -724,7 +724,7 @@ public class CassandraMailboxManagerTest extends 
MailboxManagerTest<CassandraMai
             saveThreadData(session.getUser(), mimeMessageIds, 
message.getId().getMessageId(), message.getThreadId(), Optional.of(new 
Subject("Test"))).block();
             CassandraMessageId cassandraMessageId = (CassandraMessageId) 
message.getId().getMessageId();
             ThreadTablePartitionKey partitionKey = 
threadLookupDAO(cassandraCluster)
-                .selectOneRow(cassandraMessageId).block();
+                .selectOneRow(message.getThreadId(), 
cassandraMessageId).block();
 
             inboxManager.delete(ImmutableList.of(message.getId().getUid()), 
session);
 
@@ -734,7 +734,7 @@ public class CassandraMailboxManagerTest extends 
MailboxManagerTest<CassandraMai
                     .isEmpty();
 
                 softly.assertThat(threadLookupDAO(cassandraCluster)
-                        .selectOneRow(cassandraMessageId).block())
+                        .selectOneRow(message.getThreadId(), 
cassandraMessageId).block())
                     .isNull();
             });
         }
@@ -756,7 +756,7 @@ public class CassandraMailboxManagerTest extends 
MailboxManagerTest<CassandraMai
             saveThreadData(session.getUser(), mimeMessageIds, 
message.getId().getMessageId(), message.getThreadId(), Optional.of(new 
Subject("Test"))).block();
             CassandraMessageId cassandraMessageId = (CassandraMessageId) 
message.getId().getMessageId();
             ThreadTablePartitionKey partitionKey = 
threadLookupDAO(cassandraCluster)
-                .selectOneRow(cassandraMessageId).block();
+                .selectOneRow(message.getThreadId(), 
cassandraMessageId).block();
 
             cassandraCluster.getConf().registerScenario(fail()
                 .times(1)
@@ -773,7 +773,7 @@ public class CassandraMailboxManagerTest extends 
MailboxManagerTest<CassandraMai
                     .isEmpty();
 
                 softly.assertThat(threadLookupDAO(cassandraCluster)
-                        .selectOneRow(cassandraMessageId).block())
+                        .selectOneRow(message.getThreadId(), 
cassandraMessageId).block())
                     .isNull();
             });
         }
@@ -847,7 +847,7 @@ public class CassandraMailboxManagerTest extends 
MailboxManagerTest<CassandraMai
             return threadDAO(cassandra.getCassandraCluster())
                 .insertSome(username, hashMimeMessagesIds(mimeMessageIds), 
messageId, threadId, hashSubject(baseSubject))
                 .then(threadLookupDAO(cassandra.getCassandraCluster())
-                    .insert(messageId, username, 
hashMimeMessagesIds(mimeMessageIds)));
+                    .insert(messageId, threadId, username, 
hashMimeMessagesIds(mimeMessageIds)));
         }
 
         private Set<MimeMessageId> 
buildMimeMessageIdSet(Optional<MimeMessageId> mimeMessageId, 
Optional<MimeMessageId> inReplyTo, Optional<List<MimeMessageId>> references) {
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithmTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithmTest.java
index dceb9d277e..27af99d95f 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithmTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithmTest.java
@@ -39,6 +39,7 @@ import 
org.apache.james.mailbox.cassandra.mail.CassandraThreadLookupDAO;
 import org.apache.james.mailbox.cassandra.mail.MailboxAggregateModule;
 import org.apache.james.mailbox.cassandra.mail.ThreadTablePartitionKey;
 import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.model.ThreadId;
 import org.apache.james.mailbox.store.CombinationManagerTestSystem;
 import org.apache.james.mailbox.store.ThreadIdGuessingAlgorithmContract;
 import org.apache.james.mailbox.store.mail.ThreadIdGuessingAlgorithm;
@@ -68,7 +69,8 @@ public class CassandraThreadIdGuessingAlgorithmTest extends 
ThreadIdGuessingAlgo
     protected ThreadIdGuessingAlgorithm 
initThreadIdGuessingAlgorithm(CombinationManagerTestSystem testingData) {
         CassandraThreadDAO threadDAO = new 
CassandraThreadDAO(cassandraCluster.getCassandraCluster().getConf());
         threadLookupDAO = new 
CassandraThreadLookupDAO(cassandraCluster.getCassandraCluster().getConf());
-        return new 
CassandraThreadIdGuessingAlgorithm(testingData.getMailboxManager(), threadDAO, 
threadLookupDAO);
+
+        return new 
CassandraThreadIdGuessingAlgorithm(testingData.getMessageIdManager(), 
threadDAO, threadLookupDAO);
     }
 
     @Override
@@ -94,7 +96,7 @@ public class CassandraThreadIdGuessingAlgorithmTest extends 
ThreadIdGuessingAlgo
             Optional.of(new MimeMessageId("someInReplyTo")),
             Optional.of(List.of(new MimeMessageId("someReferences"), new 
MimeMessageId("Message-ID1"))));
 
-        assertThat(threadLookupDAO.selectOneRow(newBasedMessageId).block())
+        
assertThat(threadLookupDAO.selectOneRow(ThreadId.fromBaseMessageId(newBasedMessageId),
 newBasedMessageId).block())
             .isEqualTo(new ThreadTablePartitionKey(username, 
hashMimeMessagesIds(mimeMessageIds)));
     }
 }
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAOTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAOTest.java
index 9e8c1f0af3..e0cc486291 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAOTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAOTest.java
@@ -20,8 +20,9 @@
 package org.apache.james.mailbox.cassandra.mail;
 
 import static 
org.apache.james.mailbox.cassandra.mail.CassandraThreadDAOTest.hashMimeMessagesIds;
-import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
+import java.util.List;
 import java.util.Set;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
@@ -29,6 +30,8 @@ import 
org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.core.Username;
 import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
 import 
org.apache.james.mailbox.cassandra.modules.CassandraThreadDataDefinition;
+import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.model.ThreadId;
 import org.apache.james.mailbox.store.mail.model.MimeMessageId;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -44,6 +47,7 @@ class CassandraThreadLookupDAOTest {
     private CassandraThreadLookupDAO testee;
     private CassandraMessageId messageId1;
     private CassandraMessageId messageId2;
+    private CassandraMessageId messageId3;
     private MimeMessageId mimeMessageId1;
     private MimeMessageId mimeMessageId2;
     private MimeMessageId mimeMessageId3;
@@ -56,6 +60,7 @@ class CassandraThreadLookupDAOTest {
         CassandraMessageId.Factory messageIdFactory = new 
CassandraMessageId.Factory();
         messageId1 = messageIdFactory.generate();
         messageId2 = messageIdFactory.generate();
+        messageId3 = messageIdFactory.generate();
 
         mimeMessageId1 = new MimeMessageId("MimeMessageID1");
         mimeMessageId2 = new MimeMessageId("MimeMessageID2");
@@ -65,45 +70,57 @@ class CassandraThreadLookupDAOTest {
 
     @Test
     void insertShouldSuccess() {
-        testee.insert(messageId1, ALICE, 
hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))).block();
+        testee.insert(messageId1, ThreadId.fromBaseMessageId(messageId1), 
ALICE, hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))).block();
 
-        assertThat(testee.selectOneRow(messageId1).block())
+        assertThat(testee.selectOneRow(ThreadId.fromBaseMessageId(messageId1), 
messageId1).block())
             .isEqualTo(new ThreadTablePartitionKey(ALICE, 
hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))));
     }
 
     @Test
     void selectShouldReturnNullWhenMessageIdNonExist() {
-        assertThat(testee.selectOneRow(messageId1).block())
+        assertThat(testee.selectOneRow(ThreadId.fromBaseMessageId(messageId1), 
messageId1).block())
             .isNull();
     }
 
     @Test
     void selectShouldReturnOnlyRelatedDataByThatMessageId() {
-        testee.insert(messageId1, ALICE, 
hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))).block();
-        testee.insert(messageId2, BOB, 
hashMimeMessagesIds(Set.of(mimeMessageId3, mimeMessageId4))).block();
+        testee.insert(messageId1, ThreadId.fromBaseMessageId(messageId1), 
ALICE, hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))).block();
+        testee.insert(messageId2, ThreadId.fromBaseMessageId(messageId2), BOB, 
hashMimeMessagesIds(Set.of(mimeMessageId3, mimeMessageId4))).block();
 
-        assertThat(testee.selectOneRow(messageId1).block())
+        assertThat(testee.selectOneRow(ThreadId.fromBaseMessageId(messageId1), 
messageId1).block())
             .isEqualTo(new ThreadTablePartitionKey(ALICE, 
hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))));
     }
 
+    @Test
+    void selectAllShouldReturnThreadContent() {
+        testee.insert(messageId1, ThreadId.fromBaseMessageId(messageId1), 
ALICE, hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))).block();
+        testee.insert(messageId2, ThreadId.fromBaseMessageId(messageId2), BOB, 
hashMimeMessagesIds(Set.of(mimeMessageId3, mimeMessageId4))).block();
+        testee.insert(messageId3, ThreadId.fromBaseMessageId(messageId1), BOB, 
hashMimeMessagesIds(Set.of(mimeMessageId3, mimeMessageId4))).block();
+
+        List<MessageId> block = 
testee.selectAll(ThreadId.fromBaseMessageId(messageId1)).collectList().block();
+        assertThat(block)
+            .containsOnly(messageId1, messageId3)
+            .doesNotContain(messageId2);
+    }
+
     @Test
     void deletedEntriesShouldNotBeReturned() {
-        testee.insert(messageId1, ALICE, 
hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))).block();
+        testee.insert(messageId1, ThreadId.fromBaseMessageId(messageId1), 
ALICE, hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))).block();
 
-        testee.deleteOneRow(messageId1).block();
+        testee.deleteOneRow(ThreadId.fromBaseMessageId(messageId1), 
messageId1).block();
 
-        assertThat(testee.selectOneRow(messageId1).block())
+        assertThat(testee.selectOneRow(ThreadId.fromBaseMessageId(messageId1), 
messageId1).block())
             .isNull();
     }
 
     @Test
     void deleteByNonExistMessageIdShouldDeleteNothing() {
-        testee.insert(messageId1, ALICE, 
hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))).block();
+        testee.insert(messageId1, ThreadId.fromBaseMessageId(messageId1), 
ALICE, hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))).block();
 
-        testee.deleteOneRow(messageId2).block();
+        testee.deleteOneRow(ThreadId.fromBaseMessageId(messageId2), 
messageId2).block();
 
         // message1's data should remain
-        assertThat(testee.selectOneRow(messageId1).block())
+        assertThat(testee.selectOneRow(ThreadId.fromBaseMessageId(messageId1), 
messageId1).block())
             .isEqualTo(new ThreadTablePartitionKey(ALICE, 
hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))));
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org


Reply via email to