This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push: new dad460732c JAMES-3516 Rely on Cassandra for Thread lookups (#2750) dad460732c is described below commit dad460732c3ff5bfcb98904051f4c863c84899cb Author: Benoit TELLIER <btell...@linagora.com> AuthorDate: Fri Jun 27 15:22:00 2025 +0200 JAMES-3516 Rely on Cassandra for Thread lookups (#2750) - Reorganise the lookup DAO in order to include the ThreadId as part of the primary key and relegate the messageId as a clustering key. Acceptable for our usage pattern, this enables listing content of a thread. Migration strategy: none. Truncate the old v2 table and start with an empty v3 table which is acceptable. - Rely on this method for thread resolution instead of the search index - Pay an attention that messages in the tread are accessible to the end user (right resolution) --- .../CassandraThreadIdGuessingAlgorithm.java | 43 +++++++++++++--------- .../mailbox/cassandra/DeleteMessageListener.java | 33 +++++++---------- .../mailbox/cassandra/ids/CassandraMessageId.java | 4 ++ .../cassandra/mail/CassandraThreadLookupDAO.java | 35 +++++++++++++++--- .../modules/CassandraThreadDataDefinition.java | 3 +- .../table/CassandraThreadLookupTable.java | 2 +- .../cassandra/CassandraMailboxManagerTest.java | 18 ++++----- .../CassandraThreadIdGuessingAlgorithmTest.java | 6 ++- .../mail/CassandraThreadLookupDAOTest.java | 43 +++++++++++++++------- 9 files changed, 119 insertions(+), 68 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithm.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithm.java index 812191c7cd..abea0e7884 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithm.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithm.java @@ -19,23 +19,26 @@ package org.apache.james.mailbox.cassandra; +import java.util.Collection; +import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.function.BiConsumer; import java.util.stream.Collectors; import jakarta.inject.Inject; import org.apache.commons.lang3.tuple.Pair; -import org.apache.james.mailbox.MailboxManager; import org.apache.james.mailbox.MailboxSession; +import org.apache.james.mailbox.MessageIdManager; import org.apache.james.mailbox.cassandra.mail.CassandraThreadDAO; import org.apache.james.mailbox.cassandra.mail.CassandraThreadLookupDAO; import org.apache.james.mailbox.exception.ThreadNotFoundException; +import org.apache.james.mailbox.model.FetchGroup; import org.apache.james.mailbox.model.MessageId; -import org.apache.james.mailbox.model.MultimailboxesSearchQuery; -import org.apache.james.mailbox.model.SearchQuery; +import org.apache.james.mailbox.model.MessageResult; import org.apache.james.mailbox.model.ThreadId; import org.apache.james.mailbox.store.mail.ThreadIdGuessingAlgorithm; import org.apache.james.mailbox.store.mail.model.MimeMessageId; @@ -46,17 +49,18 @@ import com.google.common.hash.Hashing; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.SynchronousSink; public class CassandraThreadIdGuessingAlgorithm implements ThreadIdGuessingAlgorithm { private static final boolean DISABLE_THREADS = Boolean.valueOf(System.getProperty("james.mailbox.threads.disable", "false")); - private final MailboxManager mailboxManager; + private final MessageIdManager messageIdManager; private final CassandraThreadDAO threadDAO; private final CassandraThreadLookupDAO threadLookupDAO; @Inject - public CassandraThreadIdGuessingAlgorithm(MailboxManager mailboxManager, CassandraThreadDAO threadDAO, CassandraThreadLookupDAO threadLookupDAO) { - this.mailboxManager = mailboxManager; + public CassandraThreadIdGuessingAlgorithm(MessageIdManager messageIdManager, CassandraThreadDAO threadDAO, CassandraThreadLookupDAO threadLookupDAO) { + this.messageIdManager = messageIdManager; this.threadDAO = threadDAO; this.threadLookupDAO = threadLookupDAO; } @@ -78,7 +82,7 @@ public class CassandraThreadIdGuessingAlgorithm implements ThreadIdGuessingAlgor .switchIfEmpty(Mono.just(ThreadId.fromBaseMessageId(messageId))) .flatMap(threadId -> threadDAO .insertSome(session.getUser(), hashMimeMessageIds, messageId, threadId, hashBaseSubject) - .then(threadLookupDAO.insert(messageId, session.getUser(), hashMimeMessageIds)) + .then(threadLookupDAO.insert(messageId, threadId, session.getUser(), hashMimeMessageIds)) .then(Mono.just(threadId))); } @@ -88,16 +92,21 @@ public class CassandraThreadIdGuessingAlgorithm implements ThreadIdGuessingAlgor return Flux.just(threadId.getBaseMessageId()); } - SearchQuery searchQuery = SearchQuery.builder() - .andCriteria(SearchQuery.threadId(threadId)) - .sorts(new SearchQuery.Sort(SearchQuery.Sort.SortClause.Arrival, SearchQuery.Sort.Order.NATURAL)) - .build(); - - MultimailboxesSearchQuery expression = MultimailboxesSearchQuery - .from(searchQuery) - .build(); - - return Flux.from(mailboxManager.search(expression, session, Integer.MAX_VALUE)) + BiConsumer<Collection<MessageId>, SynchronousSink<Collection<MessageId>>> throwIfEmpty = (ids, sink) -> { + // Handle umpopulated lookup dao + if (ids.isEmpty()) { + sink.error(new ThreadNotFoundException(threadId)); + } else { + sink.next(ids); + } + }; + return threadLookupDAO.selectAll(threadId) + .collectList() + .handle(throwIfEmpty) + .flatMapMany(messageIds -> Flux.from(messageIdManager.getMessagesReactive(messageIds, FetchGroup.MINIMAL, session))) + .sort(Comparator.comparing(MessageResult::getInternalDate).thenComparing(m -> m.getMessageId().serialize())) + .map(MessageResult::getMessageId) + .distinct() .switchIfEmpty(Mono.error(() -> new ThreadNotFoundException(threadId))); } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java index e01cc966d1..7e0a113607 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java @@ -50,20 +50,18 @@ import org.apache.james.mailbox.cassandra.mail.CassandraMailboxRecentsDAO; import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV3; import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdDAO; import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO; -import org.apache.james.mailbox.cassandra.mail.CassandraMessageMetadata; import org.apache.james.mailbox.cassandra.mail.CassandraThreadDAO; import org.apache.james.mailbox.cassandra.mail.CassandraThreadLookupDAO; import org.apache.james.mailbox.cassandra.mail.CassandraUserMailboxRightsDAO; import org.apache.james.mailbox.cassandra.mail.MessageRepresentation; import org.apache.james.mailbox.events.MailboxEvents.Expunged; import org.apache.james.mailbox.events.MailboxEvents.MailboxDeletion; -import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; import org.apache.james.mailbox.model.MailboxACL; import org.apache.james.mailbox.model.MailboxId; import org.apache.james.mailbox.model.MailboxPath; import org.apache.james.mailbox.model.MessageId; -import org.apache.james.mailbox.model.MessageMetaData; import org.apache.james.mailbox.model.MessageRange; +import org.apache.james.mailbox.model.ThreadId; import org.apache.james.mailbox.store.mail.MessageMapper; import org.apache.james.util.streams.Limit; import org.reactivestreams.Publisher; @@ -232,11 +230,10 @@ public class DeleteMessageListener implements EventListener.ReactiveGroupEventLi int prefetch = 1; return Flux.mergeDelayError(prefetch, messageIdDAO.retrieveMessages(mailboxId, MessageRange.all(), Limit.unlimited()) - .map(CassandraMessageMetadata::getComposedMessageId) - .map(ComposedMessageIdWithMetaData::getComposedMessageId) - .concatMap(metadata -> handleMessageDeletionAsPartOfMailboxDeletion((CassandraMessageId) metadata.getMessageId(), mailboxId, path.getUser()) - .then(imapUidDAO.delete((CassandraMessageId) metadata.getMessageId(), mailboxId)) - .then(messageIdDAO.delete(mailboxId, metadata.getUid()))), + .concatMap(metadata -> handleMessageDeletionAsPartOfMailboxDeletion((CassandraMessageId) metadata.getComposedMessageId().getComposedMessageId().getMessageId(), + metadata.getComposedMessageId().getThreadId(), mailboxId, path.getUser()) + .then(imapUidDAO.delete((CassandraMessageId) metadata.getComposedMessageId().getComposedMessageId().getMessageId(), mailboxId)) + .then(messageIdDAO.delete(mailboxId, metadata.getComposedMessageId().getComposedMessageId().getUid()))), deleteAcl(mailboxId), applicableFlagDAO.delete(mailboxId), firstUnseenDAO.removeAll(mailboxId), @@ -247,11 +244,9 @@ public class DeleteMessageListener implements EventListener.ReactiveGroupEventLi } private Mono<Void> handleMessageDeletion(Expunged expunged) { - return Flux.fromIterable(expunged.getExpunged() - .values()) - .map(MessageMetaData::getMessageId) - .map(CassandraMessageId.class::cast) - .concatMap(messageId -> handleMessageDeletion(messageId, expunged.getMailboxId(), expunged.getMailboxPath().getUser())) + return Flux.fromIterable(expunged.getExpunged().values()) + .concatMap(metaData -> handleMessageDeletion((CassandraMessageId) metaData.getMessageId(), + expunged.getMailboxId(), metaData.getThreadId(), expunged.getMailboxPath().getUser())) .then(); } @@ -261,7 +256,7 @@ public class DeleteMessageListener implements EventListener.ReactiveGroupEventLi .then(aclMapper.delete(mailboxId))); } - private Mono<Void> handleMessageDeletion(CassandraMessageId messageId, MailboxId mailboxId, Username owner) { + private Mono<Void> handleMessageDeletion(CassandraMessageId messageId, MailboxId mailboxId, ThreadId threadId, Username owner) { return Mono.just(messageId) .filterWhen(this::isReferenced) .flatMap(id -> readMessage(id) @@ -269,13 +264,13 @@ public class DeleteMessageListener implements EventListener.ReactiveGroupEventLi .flatMap(message -> deleteUnreferencedAttachments(message).thenReturn(message)) .flatMap(this::deleteMessageBlobs) .then(messageDAOV3.delete(messageId)) - .then(threadLookupDAO.selectOneRow(messageId) + .then(threadLookupDAO.selectOneRow(threadId, messageId) .flatMap(key -> threadDAO.deleteSome(key.getUsername(), key.getMimeMessageIds()) .collectList())) - .then(threadLookupDAO.deleteOneRow(messageId))); + .then(threadLookupDAO.deleteOneRow(threadId, messageId))); } - private Mono<Void> handleMessageDeletionAsPartOfMailboxDeletion(CassandraMessageId messageId, CassandraId excludedId, Username owner) { + private Mono<Void> handleMessageDeletionAsPartOfMailboxDeletion(CassandraMessageId messageId, ThreadId threadId, CassandraId excludedId, Username owner) { return Mono.just(messageId) .filterWhen(id -> isReferenced(id, excludedId)) .flatMap(id -> readMessage(id) @@ -283,10 +278,10 @@ public class DeleteMessageListener implements EventListener.ReactiveGroupEventLi .flatMap(message -> deleteUnreferencedAttachments(message).thenReturn(message)) .flatMap(this::deleteMessageBlobs) .then(messageDAOV3.delete(messageId)) - .then(threadLookupDAO.selectOneRow(messageId) + .then(threadLookupDAO.selectOneRow(threadId, messageId) .flatMap(key -> threadDAO.deleteSome(key.getUsername(), key.getMimeMessageIds()) .collectList())) - .then(threadLookupDAO.deleteOneRow(messageId))); + .then(threadLookupDAO.deleteOneRow(threadId, messageId))); } private Mono<MessageRepresentation> deleteMessageBlobs(MessageRepresentation message) { diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/ids/CassandraMessageId.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/ids/CassandraMessageId.java index cdfafa706a..9c9913305a 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/ids/CassandraMessageId.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/ids/CassandraMessageId.java @@ -46,6 +46,10 @@ public class CassandraMessageId implements MessageId { } } + public static CassandraMessageId of(UUID uuid) { + return new CassandraMessageId(uuid); + } + private final UUID uuid; private CassandraMessageId(UUID uuid) { diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAO.java index f910c928f8..a3ba779a08 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAO.java @@ -30,6 +30,7 @@ import static org.apache.james.mailbox.cassandra.table.CassandraMessageIds.MESSA import static org.apache.james.mailbox.cassandra.table.CassandraThreadLookupTable.MIME_MESSAGE_IDS; import static org.apache.james.mailbox.cassandra.table.CassandraThreadLookupTable.TABLE_NAME; import static org.apache.james.mailbox.cassandra.table.CassandraThreadTable.USERNAME; +import static org.apache.james.mailbox.cassandra.table.MessageIdToImapUid.THREAD_ID; import java.util.Set; @@ -39,6 +40,7 @@ import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; import org.apache.james.core.Username; import org.apache.james.mailbox.cassandra.ids.CassandraMessageId; import org.apache.james.mailbox.model.MessageId; +import org.apache.james.mailbox.model.ThreadId; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.PreparedStatement; @@ -48,6 +50,7 @@ import com.datastax.oss.driver.api.core.type.codec.TypeCodecs; import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry; import com.google.common.collect.ImmutableSet; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class CassandraThreadLookupDAO { @@ -56,6 +59,7 @@ public class CassandraThreadLookupDAO { private final CassandraAsyncExecutor executor; private final PreparedStatement insert; private final PreparedStatement select; + private final PreparedStatement selectAll; private final PreparedStatement delete; @Inject @@ -64,35 +68,54 @@ public class CassandraThreadLookupDAO { insert = session.prepare(insertInto(TABLE_NAME) .value(MESSAGE_ID, bindMarker(MESSAGE_ID)) + .value(THREAD_ID, bindMarker(THREAD_ID)) .value(USERNAME, bindMarker(USERNAME)) .value(MIME_MESSAGE_IDS, bindMarker(MIME_MESSAGE_IDS)) .build()); select = session.prepare(selectFrom(TABLE_NAME) .columns(USERNAME, MIME_MESSAGE_IDS) - .where(column(MESSAGE_ID).isEqualTo(bindMarker(MESSAGE_ID))) + .where(column(MESSAGE_ID).isEqualTo(bindMarker(MESSAGE_ID)), + column(THREAD_ID).isEqualTo(bindMarker(THREAD_ID))) + .build()); + + selectAll = session.prepare(selectFrom(TABLE_NAME) + .columns(MESSAGE_ID) + .where(column(THREAD_ID).isEqualTo(bindMarker(THREAD_ID))) .build()); delete = session.prepare(deleteFrom(TABLE_NAME) - .where(column(MESSAGE_ID).isEqualTo(bindMarker(MESSAGE_ID))) + .where(column(MESSAGE_ID).isEqualTo(bindMarker(MESSAGE_ID)), + column(THREAD_ID).isEqualTo(bindMarker(THREAD_ID))) .build()); } - public Mono<Void> insert(MessageId messageId, Username username, Set<Integer> hashMimeMessageIds) { + public Mono<Void> insert(MessageId messageId, ThreadId threadId, Username username, Set<Integer> hashMimeMessageIds) { return executor.executeVoid(insert.bind() .set(MESSAGE_ID, ((CassandraMessageId) messageId).get(), TypeCodecs.TIMEUUID) + .set(THREAD_ID, ((CassandraMessageId) threadId.getBaseMessageId()).get(), TypeCodecs.TIMEUUID) .set(USERNAME, username.asString(), TypeCodecs.TEXT) .set(MIME_MESSAGE_IDS, hashMimeMessageIds, SET_OF_INTS_CODEC)); } - public Mono<ThreadTablePartitionKey> selectOneRow(MessageId messageId) { + public Flux<MessageId> selectAll(ThreadId threadId) { + return executor.executeRows( + selectAll.bind() + .set(THREAD_ID, ((CassandraMessageId) threadId.getBaseMessageId()).get(), TypeCodecs.TIMEUUID)) + .map(row -> CassandraMessageId.of(row.get(MESSAGE_ID, TypeCodecs.TIMEUUID))); + } + + public Mono<ThreadTablePartitionKey> selectOneRow(ThreadId threadId, MessageId messageId) { return executor.executeSingleRow( - select.bind().set(MESSAGE_ID, ((CassandraMessageId) messageId).get(), TypeCodecs.TIMEUUID)) + select.bind() + .set(THREAD_ID, ((CassandraMessageId) threadId.getBaseMessageId()).get(), TypeCodecs.TIMEUUID) + .set(MESSAGE_ID, ((CassandraMessageId) messageId).get(), TypeCodecs.TIMEUUID)) .map(this::readRow); } - public Mono<Void> deleteOneRow(MessageId messageId) { + public Mono<Void> deleteOneRow(ThreadId threadId, MessageId messageId) { return executor.executeVoid(delete.bind() + .set(THREAD_ID, ((CassandraMessageId) threadId.getBaseMessageId()).get(), TypeCodecs.TIMEUUID) .set(MESSAGE_ID, ((CassandraMessageId) messageId).get(), TypeCodecs.TIMEUUID)); } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraThreadDataDefinition.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraThreadDataDefinition.java index a0efdeae01..48ac6350e6 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraThreadDataDefinition.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraThreadDataDefinition.java @@ -47,7 +47,8 @@ public interface CassandraThreadDataDefinition { .table(CassandraThreadLookupTable.TABLE_NAME) .comment("Thread table lookup by messageId, using for deletion thread data") .statement(statement -> types -> statement - .withPartitionKey(MESSAGE_ID, TIMEUUID) + .withPartitionKey(THREAD_ID, TIMEUUID) + .withClusteringColumn(MESSAGE_ID, TIMEUUID) .withColumn(USERNAME, TEXT) .withColumn(MIME_MESSAGE_IDS, frozenSetOf(INT))) .build(); diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraThreadLookupTable.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraThreadLookupTable.java index 2d8f734161..3389bc1a2c 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraThreadLookupTable.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraThreadLookupTable.java @@ -22,7 +22,7 @@ package org.apache.james.mailbox.cassandra.table; import com.datastax.oss.driver.api.core.CqlIdentifier; public interface CassandraThreadLookupTable { - String TABLE_NAME = "thread_lookup_2"; + String TABLE_NAME = "thread_lookup_3"; CqlIdentifier MIME_MESSAGE_IDS = CqlIdentifier.fromCql("mimeMessageIds"); } diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java index ee8bce8a78..34972330f2 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java @@ -653,7 +653,7 @@ public class CassandraMailboxManagerTest extends MailboxManagerTest<CassandraMai saveThreadData(session.getUser(), mimeMessageIds, message.getId().getMessageId(), message.getThreadId(), Optional.of(new Subject("Test"))).block(); CassandraMessageId cassandraMessageId = (CassandraMessageId) message.getId().getMessageId(); ThreadTablePartitionKey partitionKey = threadLookupDAO(cassandraCluster) - .selectOneRow(cassandraMessageId).block(); + .selectOneRow(message.getThreadId(), cassandraMessageId).block(); mailboxManager.deleteMailbox(inbox, session); @@ -663,7 +663,7 @@ public class CassandraMailboxManagerTest extends MailboxManagerTest<CassandraMai .isEmpty(); softly.assertThat(threadLookupDAO(cassandraCluster) - .selectOneRow(cassandraMessageId).block()) + .selectOneRow(message.getThreadId(), cassandraMessageId).block()) .isNull(); }); } @@ -685,7 +685,7 @@ public class CassandraMailboxManagerTest extends MailboxManagerTest<CassandraMai saveThreadData(session.getUser(), mimeMessageIds, message.getId().getMessageId(), message.getThreadId(), Optional.of(new Subject("Test"))).block(); CassandraMessageId cassandraMessageId = (CassandraMessageId) message.getId().getMessageId(); ThreadTablePartitionKey partitionKey = threadLookupDAO(cassandraCluster) - .selectOneRow(cassandraMessageId).block(); + .selectOneRow(message.getThreadId(), cassandraMessageId).block(); cassandraCluster.getConf().registerScenario(fail() .times(1) @@ -702,7 +702,7 @@ public class CassandraMailboxManagerTest extends MailboxManagerTest<CassandraMai .isEmpty(); softly.assertThat(threadLookupDAO(cassandraCluster) - .selectOneRow(cassandraMessageId).block()) + .selectOneRow(message.getThreadId(), cassandraMessageId).block()) .isNull(); }); } @@ -724,7 +724,7 @@ public class CassandraMailboxManagerTest extends MailboxManagerTest<CassandraMai saveThreadData(session.getUser(), mimeMessageIds, message.getId().getMessageId(), message.getThreadId(), Optional.of(new Subject("Test"))).block(); CassandraMessageId cassandraMessageId = (CassandraMessageId) message.getId().getMessageId(); ThreadTablePartitionKey partitionKey = threadLookupDAO(cassandraCluster) - .selectOneRow(cassandraMessageId).block(); + .selectOneRow(message.getThreadId(), cassandraMessageId).block(); inboxManager.delete(ImmutableList.of(message.getId().getUid()), session); @@ -734,7 +734,7 @@ public class CassandraMailboxManagerTest extends MailboxManagerTest<CassandraMai .isEmpty(); softly.assertThat(threadLookupDAO(cassandraCluster) - .selectOneRow(cassandraMessageId).block()) + .selectOneRow(message.getThreadId(), cassandraMessageId).block()) .isNull(); }); } @@ -756,7 +756,7 @@ public class CassandraMailboxManagerTest extends MailboxManagerTest<CassandraMai saveThreadData(session.getUser(), mimeMessageIds, message.getId().getMessageId(), message.getThreadId(), Optional.of(new Subject("Test"))).block(); CassandraMessageId cassandraMessageId = (CassandraMessageId) message.getId().getMessageId(); ThreadTablePartitionKey partitionKey = threadLookupDAO(cassandraCluster) - .selectOneRow(cassandraMessageId).block(); + .selectOneRow(message.getThreadId(), cassandraMessageId).block(); cassandraCluster.getConf().registerScenario(fail() .times(1) @@ -773,7 +773,7 @@ public class CassandraMailboxManagerTest extends MailboxManagerTest<CassandraMai .isEmpty(); softly.assertThat(threadLookupDAO(cassandraCluster) - .selectOneRow(cassandraMessageId).block()) + .selectOneRow(message.getThreadId(), cassandraMessageId).block()) .isNull(); }); } @@ -847,7 +847,7 @@ public class CassandraMailboxManagerTest extends MailboxManagerTest<CassandraMai return threadDAO(cassandra.getCassandraCluster()) .insertSome(username, hashMimeMessagesIds(mimeMessageIds), messageId, threadId, hashSubject(baseSubject)) .then(threadLookupDAO(cassandra.getCassandraCluster()) - .insert(messageId, username, hashMimeMessagesIds(mimeMessageIds))); + .insert(messageId, threadId, username, hashMimeMessagesIds(mimeMessageIds))); } private Set<MimeMessageId> buildMimeMessageIdSet(Optional<MimeMessageId> mimeMessageId, Optional<MimeMessageId> inReplyTo, Optional<List<MimeMessageId>> references) { diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithmTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithmTest.java index dceb9d277e..27af99d95f 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithmTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithmTest.java @@ -39,6 +39,7 @@ import org.apache.james.mailbox.cassandra.mail.CassandraThreadLookupDAO; import org.apache.james.mailbox.cassandra.mail.MailboxAggregateModule; import org.apache.james.mailbox.cassandra.mail.ThreadTablePartitionKey; import org.apache.james.mailbox.model.MessageId; +import org.apache.james.mailbox.model.ThreadId; import org.apache.james.mailbox.store.CombinationManagerTestSystem; import org.apache.james.mailbox.store.ThreadIdGuessingAlgorithmContract; import org.apache.james.mailbox.store.mail.ThreadIdGuessingAlgorithm; @@ -68,7 +69,8 @@ public class CassandraThreadIdGuessingAlgorithmTest extends ThreadIdGuessingAlgo protected ThreadIdGuessingAlgorithm initThreadIdGuessingAlgorithm(CombinationManagerTestSystem testingData) { CassandraThreadDAO threadDAO = new CassandraThreadDAO(cassandraCluster.getCassandraCluster().getConf()); threadLookupDAO = new CassandraThreadLookupDAO(cassandraCluster.getCassandraCluster().getConf()); - return new CassandraThreadIdGuessingAlgorithm(testingData.getMailboxManager(), threadDAO, threadLookupDAO); + + return new CassandraThreadIdGuessingAlgorithm(testingData.getMessageIdManager(), threadDAO, threadLookupDAO); } @Override @@ -94,7 +96,7 @@ public class CassandraThreadIdGuessingAlgorithmTest extends ThreadIdGuessingAlgo Optional.of(new MimeMessageId("someInReplyTo")), Optional.of(List.of(new MimeMessageId("someReferences"), new MimeMessageId("Message-ID1")))); - assertThat(threadLookupDAO.selectOneRow(newBasedMessageId).block()) + assertThat(threadLookupDAO.selectOneRow(ThreadId.fromBaseMessageId(newBasedMessageId), newBasedMessageId).block()) .isEqualTo(new ThreadTablePartitionKey(username, hashMimeMessagesIds(mimeMessageIds))); } } diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAOTest.java index 9e8c1f0af3..e0cc486291 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAOTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAOTest.java @@ -20,8 +20,9 @@ package org.apache.james.mailbox.cassandra.mail; import static org.apache.james.mailbox.cassandra.mail.CassandraThreadDAOTest.hashMimeMessagesIds; -import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.Assertions.assertThat; +import java.util.List; import java.util.Set; import org.apache.james.backends.cassandra.CassandraCluster; @@ -29,6 +30,8 @@ import org.apache.james.backends.cassandra.CassandraClusterExtension; import org.apache.james.core.Username; import org.apache.james.mailbox.cassandra.ids.CassandraMessageId; import org.apache.james.mailbox.cassandra.modules.CassandraThreadDataDefinition; +import org.apache.james.mailbox.model.MessageId; +import org.apache.james.mailbox.model.ThreadId; import org.apache.james.mailbox.store.mail.model.MimeMessageId; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -44,6 +47,7 @@ class CassandraThreadLookupDAOTest { private CassandraThreadLookupDAO testee; private CassandraMessageId messageId1; private CassandraMessageId messageId2; + private CassandraMessageId messageId3; private MimeMessageId mimeMessageId1; private MimeMessageId mimeMessageId2; private MimeMessageId mimeMessageId3; @@ -56,6 +60,7 @@ class CassandraThreadLookupDAOTest { CassandraMessageId.Factory messageIdFactory = new CassandraMessageId.Factory(); messageId1 = messageIdFactory.generate(); messageId2 = messageIdFactory.generate(); + messageId3 = messageIdFactory.generate(); mimeMessageId1 = new MimeMessageId("MimeMessageID1"); mimeMessageId2 = new MimeMessageId("MimeMessageID2"); @@ -65,45 +70,57 @@ class CassandraThreadLookupDAOTest { @Test void insertShouldSuccess() { - testee.insert(messageId1, ALICE, hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))).block(); + testee.insert(messageId1, ThreadId.fromBaseMessageId(messageId1), ALICE, hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))).block(); - assertThat(testee.selectOneRow(messageId1).block()) + assertThat(testee.selectOneRow(ThreadId.fromBaseMessageId(messageId1), messageId1).block()) .isEqualTo(new ThreadTablePartitionKey(ALICE, hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2)))); } @Test void selectShouldReturnNullWhenMessageIdNonExist() { - assertThat(testee.selectOneRow(messageId1).block()) + assertThat(testee.selectOneRow(ThreadId.fromBaseMessageId(messageId1), messageId1).block()) .isNull(); } @Test void selectShouldReturnOnlyRelatedDataByThatMessageId() { - testee.insert(messageId1, ALICE, hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))).block(); - testee.insert(messageId2, BOB, hashMimeMessagesIds(Set.of(mimeMessageId3, mimeMessageId4))).block(); + testee.insert(messageId1, ThreadId.fromBaseMessageId(messageId1), ALICE, hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))).block(); + testee.insert(messageId2, ThreadId.fromBaseMessageId(messageId2), BOB, hashMimeMessagesIds(Set.of(mimeMessageId3, mimeMessageId4))).block(); - assertThat(testee.selectOneRow(messageId1).block()) + assertThat(testee.selectOneRow(ThreadId.fromBaseMessageId(messageId1), messageId1).block()) .isEqualTo(new ThreadTablePartitionKey(ALICE, hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2)))); } + @Test + void selectAllShouldReturnThreadContent() { + testee.insert(messageId1, ThreadId.fromBaseMessageId(messageId1), ALICE, hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))).block(); + testee.insert(messageId2, ThreadId.fromBaseMessageId(messageId2), BOB, hashMimeMessagesIds(Set.of(mimeMessageId3, mimeMessageId4))).block(); + testee.insert(messageId3, ThreadId.fromBaseMessageId(messageId1), BOB, hashMimeMessagesIds(Set.of(mimeMessageId3, mimeMessageId4))).block(); + + List<MessageId> block = testee.selectAll(ThreadId.fromBaseMessageId(messageId1)).collectList().block(); + assertThat(block) + .containsOnly(messageId1, messageId3) + .doesNotContain(messageId2); + } + @Test void deletedEntriesShouldNotBeReturned() { - testee.insert(messageId1, ALICE, hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))).block(); + testee.insert(messageId1, ThreadId.fromBaseMessageId(messageId1), ALICE, hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))).block(); - testee.deleteOneRow(messageId1).block(); + testee.deleteOneRow(ThreadId.fromBaseMessageId(messageId1), messageId1).block(); - assertThat(testee.selectOneRow(messageId1).block()) + assertThat(testee.selectOneRow(ThreadId.fromBaseMessageId(messageId1), messageId1).block()) .isNull(); } @Test void deleteByNonExistMessageIdShouldDeleteNothing() { - testee.insert(messageId1, ALICE, hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))).block(); + testee.insert(messageId1, ThreadId.fromBaseMessageId(messageId1), ALICE, hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))).block(); - testee.deleteOneRow(messageId2).block(); + testee.deleteOneRow(ThreadId.fromBaseMessageId(messageId2), messageId2).block(); // message1's data should remain - assertThat(testee.selectOneRow(messageId1).block()) + assertThat(testee.selectOneRow(ThreadId.fromBaseMessageId(messageId1), messageId1).block()) .isEqualTo(new ThreadTablePartitionKey(ALICE, hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2)))); } --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org