This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push: new b56acecd7d JAMES-3937 Use hash for CassandraThreadIdGuessingAlgorithm (#1727) b56acecd7d is described below commit b56acecd7d6f8cd09e14a31315e3285af18a50fc Author: Trần Hồng Quân <55171818+quantranhong1...@users.noreply.github.com> AuthorDate: Fri Sep 15 15:43:00 2023 +0700 JAMES-3937 Use hash for CassandraThreadIdGuessingAlgorithm (#1727) --- .../CassandraThreadIdGuessingAlgorithm.java | 21 ++++-- .../mailbox/cassandra/mail/CassandraThreadDAO.java | 26 +++---- .../cassandra/mail/CassandraThreadLookupDAO.java | 13 ++-- .../cassandra/mail/ThreadTablePartitionKey.java | 7 +- .../cassandra/modules/CassandraThreadModule.java | 7 +- .../table/CassandraThreadLookupTable.java | 2 +- .../cassandra/table/CassandraThreadTable.java | 2 +- .../cassandra/CassandraMailboxManagerTest.java | 6 +- .../CassandraThreadIdGuessingAlgorithmTest.java | 6 +- .../cassandra/mail/CassandraThreadDAOTest.java | 88 ++++++++++++++-------- .../mail/CassandraThreadLookupDAOTest.java | 19 ++--- upgrade-instructions.md | 12 +++ 12 files changed, 128 insertions(+), 81 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithm.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithm.java index 872fc5edb3..b61fb52e53 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithm.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithm.java @@ -23,6 +23,7 @@ import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; import javax.inject.Inject; @@ -41,6 +42,8 @@ import org.apache.james.mailbox.store.mail.model.MimeMessageId; import org.apache.james.mailbox.store.mail.model.Subject; import org.apache.james.mailbox.store.search.SearchUtil; +import com.google.common.hash.Hashing; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -58,16 +61,22 @@ public class CassandraThreadIdGuessingAlgorithm implements ThreadIdGuessingAlgor @Override public Mono<ThreadId> guessThreadIdReactive(MessageId messageId, Optional<MimeMessageId> mimeMessageId, Optional<MimeMessageId> inReplyTo, Optional<List<MimeMessageId>> references, Optional<Subject> subject, MailboxSession session) { - Set<MimeMessageId> mimeMessageIds = buildMimeMessageIdSet(mimeMessageId, inReplyTo, references); - Optional<Subject> baseSubject = subject.map(value -> new Subject(SearchUtil.getBaseSubject(value.getValue()))); - return Flux.from(threadDAO.selectSome(session.getUser(), mimeMessageIds)) - .filter(pair -> pair.getLeft().equals(baseSubject)) + Set<Integer> hashMimeMessageIds = buildMimeMessageIdSet(mimeMessageId, inReplyTo, references) + .stream() + .map(mimeMessageId1 -> Hashing.murmur3_32_fixed().hashBytes(mimeMessageId1.getValue().getBytes()).asInt()) + .collect(Collectors.toSet()); + + Optional<Integer> hashBaseSubject = subject.map(value -> new Subject(SearchUtil.getBaseSubject(value.getValue()))) + .map(subject1 -> Hashing.murmur3_32_fixed().hashBytes(subject1.getValue().getBytes()).asInt()); + + return Flux.from(threadDAO.selectSome(session.getUser(), hashMimeMessageIds)) + .filter(pair -> pair.getLeft().equals(hashBaseSubject)) .next() .map(Pair::getRight) .switchIfEmpty(Mono.just(ThreadId.fromBaseMessageId(messageId))) .flatMap(threadId -> threadDAO - .insertSome(session.getUser(), mimeMessageIds, messageId, threadId, baseSubject) - .then(threadLookupDAO.insert(messageId, session.getUser(), mimeMessageIds)) + .insertSome(session.getUser(), hashMimeMessageIds, messageId, threadId, hashBaseSubject) + .then(threadLookupDAO.insert(messageId, session.getUser(), hashMimeMessageIds)) .then(Mono.just(threadId))); } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadDAO.java index 5af4b990c1..7a9af5a18c 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadDAO.java @@ -43,8 +43,6 @@ import org.apache.james.core.Username; import org.apache.james.mailbox.cassandra.ids.CassandraMessageId; import org.apache.james.mailbox.model.MessageId; import org.apache.james.mailbox.model.ThreadId; -import org.apache.james.mailbox.store.mail.model.MimeMessageId; -import org.apache.james.mailbox.store.mail.model.Subject; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.PreparedStatement; @@ -83,35 +81,35 @@ public class CassandraThreadDAO { .build()); } - public Flux<Void> insertSome(Username username, Set<MimeMessageId> mimeMessageIds, MessageId messageId, ThreadId threadId, Optional<Subject> baseSubject) { - return Flux.fromIterable(mimeMessageIds) + public Flux<Void> insertSome(Username username, Set<Integer> hashMimeMessageIds, MessageId messageId, ThreadId threadId, Optional<Integer> hashBaseSubject) { + return Flux.fromIterable(hashMimeMessageIds) .flatMap(mimeMessageId -> executor.executeVoid(insertOne.bind() .set(USERNAME, username.asString(), TypeCodecs.TEXT) - .set(MIME_MESSAGE_ID, mimeMessageId.getValue(), TypeCodecs.TEXT) + .set(MIME_MESSAGE_ID, mimeMessageId, TypeCodecs.INT) .set(MESSAGE_ID, ((CassandraMessageId) messageId).get(), TypeCodecs.TIMEUUID) .set(THREAD_ID, ((CassandraMessageId) threadId.getBaseMessageId()).get(), TypeCodecs.TIMEUUID) - .set(BASE_SUBJECT, baseSubject.map(Subject::getValue).orElse(null), TypeCodecs.TEXT)), DEFAULT_CONCURRENCY); + .set(BASE_SUBJECT, hashBaseSubject.orElse(null), TypeCodecs.INT)), DEFAULT_CONCURRENCY); } - public Flux<Pair<Optional<Subject>, ThreadId>> selectSome(Username username, Set<MimeMessageId> mimeMessageIds) { - return Flux.fromIterable(mimeMessageIds) + public Flux<Pair<Optional<Integer>, ThreadId>> selectSome(Username username, Set<Integer> hashMimeMessageIds) { + return Flux.fromIterable(hashMimeMessageIds) .flatMap(mimeMessageId -> executor .executeSingleRow(selectOne.bind() .set(USERNAME, username.asString(), TypeCodecs.TEXT) - .set(MIME_MESSAGE_ID, mimeMessageId.getValue(), TypeCodecs.TEXT)) + .set(MIME_MESSAGE_ID, mimeMessageId, TypeCodecs.INT)) .map(this::readRow), DEFAULT_CONCURRENCY) .distinct(); } - public Flux<Void> deleteSome(Username username, Set<MimeMessageId> mimeMessageIds) { - return Flux.fromIterable(mimeMessageIds) + public Flux<Void> deleteSome(Username username, Set<Integer> hashMimeMessageIds) { + return Flux.fromIterable(hashMimeMessageIds) .flatMap(mimeMessageId -> executor.executeVoid(deleteOne.bind() .set(USERNAME, username.asString(), TypeCodecs.TEXT) - .set(MIME_MESSAGE_ID, mimeMessageId.getValue(), TypeCodecs.TEXT))); + .set(MIME_MESSAGE_ID, mimeMessageId, TypeCodecs.INT))); } - public Pair<Optional<Subject>, ThreadId> readRow(Row row) { - return Pair.of(Optional.ofNullable(row.getString(BASE_SUBJECT)).map(Subject::new), + public Pair<Optional<Integer>, ThreadId> readRow(Row row) { + return Pair.of(Optional.ofNullable(row.get(BASE_SUBJECT, Integer.class)), ThreadId.fromBaseMessageId(CassandraMessageId.Factory.of(row.getUuid(THREAD_ID)))); } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAO.java index 5913c0622b..e95c7e9c00 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAO.java @@ -19,7 +19,7 @@ package org.apache.james.mailbox.cassandra.mail; -import static com.datastax.oss.driver.api.core.type.DataTypes.TEXT; +import static com.datastax.oss.driver.api.core.type.DataTypes.INT; import static com.datastax.oss.driver.api.core.type.DataTypes.frozenSetOf; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom; @@ -39,7 +39,6 @@ import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; import org.apache.james.core.Username; import org.apache.james.mailbox.cassandra.ids.CassandraMessageId; import org.apache.james.mailbox.model.MessageId; -import org.apache.james.mailbox.store.mail.model.MimeMessageId; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.PreparedStatement; @@ -52,7 +51,7 @@ import com.google.common.collect.ImmutableSet; import reactor.core.publisher.Mono; public class CassandraThreadLookupDAO { - private static final TypeCodec<Set<String>> SET_OF_STRINGS_CODEC = CodecRegistry.DEFAULT.codecFor(frozenSetOf(TEXT)); + private static final TypeCodec<Set<Integer>> SET_OF_INTS_CODEC = CodecRegistry.DEFAULT.codecFor(frozenSetOf(INT)); private final CassandraAsyncExecutor executor; private final PreparedStatement insert; @@ -79,12 +78,11 @@ public class CassandraThreadLookupDAO { .build()); } - public Mono<Void> insert(MessageId messageId, Username username, Set<MimeMessageId> mimeMessageIds) { - Set<String> mimeMessageIdsString = mimeMessageIds.stream().map(MimeMessageId::getValue).collect(ImmutableSet.toImmutableSet()); + public Mono<Void> insert(MessageId messageId, Username username, Set<Integer> hashMimeMessageIds) { return executor.executeVoid(insert.bind() .set(MESSAGE_ID, ((CassandraMessageId) messageId).get(), TypeCodecs.TIMEUUID) .set(USERNAME, username.asString(), TypeCodecs.TEXT) - .set(MIME_MESSAGE_IDS, mimeMessageIdsString, SET_OF_STRINGS_CODEC)); + .set(MIME_MESSAGE_IDS, hashMimeMessageIds, SET_OF_INTS_CODEC)); } public Mono<ThreadTablePartitionKey> selectOneRow(MessageId messageId) { @@ -99,9 +97,8 @@ public class CassandraThreadLookupDAO { } private ThreadTablePartitionKey readRow(Row row) { - Set<MimeMessageId> mimeMessageIds = row.get(MIME_MESSAGE_IDS, SET_OF_STRINGS_CODEC) + Set<Integer> mimeMessageIds = row.get(MIME_MESSAGE_IDS, SET_OF_INTS_CODEC) .stream() - .map(MimeMessageId::new) .collect(ImmutableSet.toImmutableSet()); return new ThreadTablePartitionKey(Username.of(row.getString(USERNAME)), mimeMessageIds); } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/ThreadTablePartitionKey.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/ThreadTablePartitionKey.java index f6913580ec..86da94008b 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/ThreadTablePartitionKey.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/ThreadTablePartitionKey.java @@ -23,13 +23,12 @@ import java.util.Objects; import java.util.Set; import org.apache.james.core.Username; -import org.apache.james.mailbox.store.mail.model.MimeMessageId; public class ThreadTablePartitionKey { private final Username username; - private final Set<MimeMessageId> mimeMessageIds; + private final Set<Integer> mimeMessageIds; - public ThreadTablePartitionKey(Username username, Set<MimeMessageId> mimeMessageIds) { + public ThreadTablePartitionKey(Username username, Set<Integer> mimeMessageIds) { this.username = username; this.mimeMessageIds = mimeMessageIds; } @@ -38,7 +37,7 @@ public class ThreadTablePartitionKey { return username; } - public Set<MimeMessageId> getMimeMessageIds() { + public Set<Integer> getMimeMessageIds() { return mimeMessageIds; } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraThreadModule.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraThreadModule.java index b323be9419..efc53457d9 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraThreadModule.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraThreadModule.java @@ -19,6 +19,7 @@ package org.apache.james.mailbox.cassandra.modules; +import static com.datastax.oss.driver.api.core.type.DataTypes.INT; import static com.datastax.oss.driver.api.core.type.DataTypes.TEXT; import static com.datastax.oss.driver.api.core.type.DataTypes.TIMEUUID; import static com.datastax.oss.driver.api.core.type.DataTypes.frozenSetOf; @@ -39,16 +40,16 @@ public interface CassandraThreadModule { .comment("Related data needed for guessing threadId algorithm") .statement(statement -> types -> statement .withPartitionKey(USERNAME, TEXT) - .withPartitionKey(MIME_MESSAGE_ID, TEXT) + .withPartitionKey(MIME_MESSAGE_ID, INT) .withClusteringColumn(MESSAGE_ID, TIMEUUID) .withColumn(THREAD_ID, TIMEUUID) - .withColumn(BASE_SUBJECT, TEXT)) + .withColumn(BASE_SUBJECT, INT)) .table(CassandraThreadLookupTable.TABLE_NAME) .comment("Thread table lookup by messageId, using for deletion thread data") .statement(statement -> types -> statement .withPartitionKey(MESSAGE_ID, TIMEUUID) .withColumn(USERNAME, TEXT) - .withColumn(MIME_MESSAGE_IDS, frozenSetOf(TEXT))) + .withColumn(MIME_MESSAGE_IDS, frozenSetOf(INT))) .build(); } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraThreadLookupTable.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraThreadLookupTable.java index 53275ade14..2d8f734161 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraThreadLookupTable.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraThreadLookupTable.java @@ -22,7 +22,7 @@ package org.apache.james.mailbox.cassandra.table; import com.datastax.oss.driver.api.core.CqlIdentifier; public interface CassandraThreadLookupTable { - String TABLE_NAME = "threadLookupTable"; + String TABLE_NAME = "thread_lookup_2"; CqlIdentifier MIME_MESSAGE_IDS = CqlIdentifier.fromCql("mimeMessageIds"); } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraThreadTable.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraThreadTable.java index b437a9f35c..2b0e7e9a82 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraThreadTable.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraThreadTable.java @@ -22,7 +22,7 @@ package org.apache.james.mailbox.cassandra.table; import com.datastax.oss.driver.api.core.CqlIdentifier; public interface CassandraThreadTable { - String TABLE_NAME = "threadTable"; + String TABLE_NAME = "thread_2"; CqlIdentifier USERNAME = CqlIdentifier.fromCql("username"); diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java index 4b52c7819a..3ce0c57a54 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java @@ -20,6 +20,8 @@ package org.apache.james.mailbox.cassandra; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom; import static org.apache.james.backends.cassandra.Scenario.Builder.fail; +import static org.apache.james.mailbox.cassandra.mail.CassandraThreadDAOTest.hashMimeMessagesIds; +import static org.apache.james.mailbox.cassandra.mail.CassandraThreadDAOTest.hashSubject; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; @@ -842,9 +844,9 @@ public class CassandraMailboxManagerTest extends MailboxManagerTest<CassandraMai private Mono<Void> saveThreadData(Username username, Set<MimeMessageId> mimeMessageIds, MessageId messageId, ThreadId threadId, Optional<Subject> baseSubject) { return threadDAO(cassandra.getCassandraCluster()) - .insertSome(username, mimeMessageIds, messageId, threadId, baseSubject) + .insertSome(username, hashMimeMessagesIds(mimeMessageIds), messageId, threadId, hashSubject(baseSubject)) .then(threadLookupDAO(cassandra.getCassandraCluster()) - .insert(messageId, username, mimeMessageIds)); + .insert(messageId, username, hashMimeMessagesIds(mimeMessageIds))); } private Set<MimeMessageId> buildMimeMessageIdSet(Optional<MimeMessageId> mimeMessageId, Optional<MimeMessageId> inReplyTo, Optional<List<MimeMessageId>> references) { diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithmTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithmTest.java index f104dad4be..546657676f 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithmTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithmTest.java @@ -19,6 +19,8 @@ package org.apache.james.mailbox.cassandra; +import static org.apache.james.mailbox.cassandra.mail.CassandraThreadDAOTest.hashMimeMessagesIds; +import static org.apache.james.mailbox.cassandra.mail.CassandraThreadDAOTest.hashSubject; import static org.assertj.core.api.Assertions.assertThat; import java.util.List; @@ -93,7 +95,7 @@ public class CassandraThreadIdGuessingAlgorithmTest extends ThreadIdGuessingAlgo @Override protected Flux<Void> saveThreadData(Username username, Set<MimeMessageId> mimeMessageIds, MessageId messageId, ThreadId threadId, Optional<Subject> baseSubject) { - return threadDAO.insertSome(username, mimeMessageIds, messageId, threadId, baseSubject); + return threadDAO.insertSome(username, hashMimeMessagesIds(mimeMessageIds), messageId, threadId, hashSubject(baseSubject)); } @Test @@ -110,6 +112,6 @@ public class CassandraThreadIdGuessingAlgorithmTest extends ThreadIdGuessingAlgo Optional.of(List.of(new MimeMessageId("someReferences"), new MimeMessageId("Message-ID1")))); assertThat(threadLookupDAO.selectOneRow(newBasedMessageId).block()) - .isEqualTo(new ThreadTablePartitionKey(username, mimeMessageIds)); + .isEqualTo(new ThreadTablePartitionKey(username, hashMimeMessagesIds(mimeMessageIds))); } } diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadDAOTest.java index 1eee881592..f6bac39978 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadDAOTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadDAOTest.java @@ -22,6 +22,8 @@ package org.apache.james.mailbox.cassandra.mail; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; import org.apache.james.backends.cassandra.CassandraCluster; @@ -38,8 +40,19 @@ import org.junit.jupiter.api.extension.RegisterExtension; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.hash.Hashing; public class CassandraThreadDAOTest { + public static Set<Integer> hashMimeMessagesIds(Set<MimeMessageId> mimeMessageIds) { + return mimeMessageIds.stream() + .map(mimeMessageId -> Hashing.murmur3_32_fixed().hashBytes(mimeMessageId.getValue().getBytes()).asInt()) + .collect(Collectors.toSet()); + } + + public static Optional<Integer> hashSubject(Optional<Subject> baseSubjectOptional) { + return baseSubjectOptional.map(baseSubject -> Hashing.murmur3_32_fixed().hashBytes(baseSubject.getValue().getBytes()).asInt()); + } + private static final Username ALICE = Username.of("alice"); private static final Username BOB = Username.of("bob"); @@ -78,35 +91,38 @@ public class CassandraThreadDAOTest { } @Test - void insertShouldSuccess() { + void insertShouldSucceed() { Optional<Subject> message1BaseSubject = Optional.of(new Subject("subject")); - testee.insertSome(ALICE, ImmutableSet.of(mimeMessageId1), messageId1, threadId1, message1BaseSubject).collectList().block(); + testee.insertSome(ALICE, hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1)), messageId1, threadId1, hashSubject(message1BaseSubject)) + .collectList().block(); - assertThat(testee.selectSome(ALICE, ImmutableSet.of(mimeMessageId1)).collectList().block()) - .isEqualTo(ImmutableList.of(Pair.of(message1BaseSubject, threadId1))); + assertThat(testee.selectSome(ALICE, hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1))).collectList().block()) + .isEqualTo(ImmutableList.of(Pair.of(hashSubject(message1BaseSubject), threadId1))); } @Test void insertNullBaseSubjectShouldBeAllowed() { Optional<Subject> message1BaseSubject = Optional.empty(); - testee.insertSome(ALICE, ImmutableSet.of(mimeMessageId1), messageId1, threadId1, message1BaseSubject).collectList().block(); + testee.insertSome(ALICE, hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1)), messageId1, threadId1, hashSubject(message1BaseSubject)) + .collectList().block(); - assertThat(testee.selectSome(ALICE, ImmutableSet.of(mimeMessageId1)).collectList().block()) + assertThat(testee.selectSome(ALICE, hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1))).collectList().block()) .isEqualTo(ImmutableList.of(Pair.of(Optional.empty(), threadId1))); } @Test void insertEmptyBaseSubjectShouldBeAllowed() { Optional<Subject> message1BaseSubject = Optional.of(new Subject("")); - testee.insertSome(ALICE, ImmutableSet.of(mimeMessageId1), messageId1, threadId1, message1BaseSubject).collectList().block(); + testee.insertSome(ALICE, hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1)), messageId1, threadId1, hashSubject(message1BaseSubject)) + .collectList().block(); - assertThat(testee.selectSome(ALICE, ImmutableSet.of(mimeMessageId1)).collectList().block()) - .isEqualTo(ImmutableList.of(Pair.of(message1BaseSubject, threadId1))); + assertThat(testee.selectSome(ALICE, hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1))).collectList().block()) + .isEqualTo(ImmutableList.of(Pair.of(hashSubject(message1BaseSubject), threadId1))); } @Test void selectShouldReturnEmptyByDefault() { - assertThat(testee.selectSome(ALICE, ImmutableSet.of(mimeMessageId1)).collectList().block() + assertThat(testee.selectSome(ALICE, hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1))).collectList().block() .isEmpty()); } @@ -115,61 +131,69 @@ public class CassandraThreadDAOTest { Optional<Subject> messageBaseSubject = Optional.of(new Subject("subject")); // given message1 and message2 belongs to same thread, related to each other by mimeMessageId2, mimeMessageId3 - testee.insertSome(ALICE, ImmutableSet.of(mimeMessageId1, mimeMessageId2, mimeMessageId3), messageId1, threadId1, messageBaseSubject).collectList().block(); - testee.insertSome(ALICE, ImmutableSet.of(mimeMessageId2, mimeMessageId3, mimeMessageId4), messageId2, threadId1, messageBaseSubject).collectList().block(); + testee.insertSome(ALICE, hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1, mimeMessageId2, mimeMessageId3)), messageId1, threadId1, hashSubject(messageBaseSubject)) + .collectList().block(); + testee.insertSome(ALICE, hashMimeMessagesIds(ImmutableSet.of(mimeMessageId2, mimeMessageId3, mimeMessageId4)), messageId2, threadId1, hashSubject(messageBaseSubject)) + .collectList().block(); // select with new message having mimeMessageId2 and mimeMessageId3 - assertThat(testee.selectSome(ALICE, ImmutableSet.of(mimeMessageId2, mimeMessageId3)).collectList().block()) - .isEqualTo(ImmutableList.of(Pair.of(messageBaseSubject, threadId1))); + assertThat(testee.selectSome(ALICE, hashMimeMessagesIds(ImmutableSet.of(mimeMessageId2, mimeMessageId3))).collectList().block()) + .isEqualTo(ImmutableList.of(Pair.of(hashSubject(messageBaseSubject), threadId1))); } @Test void selectShouldReturnOnlyRelatedMessageDataOfAUser() { // insert message1 data of ALICE Optional<Subject> message1BaseSubject = Optional.of(new Subject("subject")); - testee.insertSome(ALICE, ImmutableSet.of(mimeMessageId1), messageId1, threadId1, message1BaseSubject).collectList().block(); + testee.insertSome(ALICE, hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1)), messageId1, threadId1, hashSubject(message1BaseSubject)) + .collectList().block(); // insert message2 data of BOB Optional<Subject> message2BaseSubject = Optional.of(new Subject("subject2")); - testee.insertSome(BOB, ImmutableSet.of(mimeMessageId2), messageId2, threadId2, message2BaseSubject).collectList().block(); + testee.insertSome(BOB, hashMimeMessagesIds(ImmutableSet.of(mimeMessageId2)), messageId2, threadId2, hashSubject(message2BaseSubject)).collectList().block(); // select some data of BOB - assertThat(testee.selectSome(BOB, ImmutableSet.of(mimeMessageId2)).collectList().block()) - .isEqualTo(ImmutableList.of(Pair.of(message2BaseSubject, threadId2))); + assertThat(testee.selectSome(BOB, hashMimeMessagesIds(ImmutableSet.of(mimeMessageId2))) + .collectList().block()) + .isEqualTo(ImmutableList.of(Pair.of(hashSubject(message2BaseSubject), threadId2))); } @Test void selectShouldReturnOnlyRelatedMessageDataOfAThread() { // insert message1 data of ALICE which in thread1 Optional<Subject> message1BaseSubject = Optional.of(new Subject("subject")); - testee.insertSome(ALICE, ImmutableSet.of(mimeMessageId1), messageId1, threadId1, message1BaseSubject).collectList().block(); + testee.insertSome(ALICE, hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1)), messageId1, threadId1, hashSubject(message1BaseSubject)) + .collectList().block(); // insert message2 data of ALICE which in thread2 Optional<Subject> message2BaseSubject = Optional.of(new Subject("subject2")); - testee.insertSome(ALICE, ImmutableSet.of(mimeMessageId2), messageId2, threadId2, message2BaseSubject).collectList().block(); + testee.insertSome(ALICE, hashMimeMessagesIds(ImmutableSet.of(mimeMessageId2)), messageId2, threadId2, hashSubject(message2BaseSubject)) + .collectList().block(); // select some data related to thread2 - assertThat(testee.selectSome(ALICE, ImmutableSet.of(mimeMessageId2)).collectList().block()) - .isEqualTo(ImmutableList.of(Pair.of(message2BaseSubject, threadId2))); + assertThat(testee.selectSome(ALICE, hashMimeMessagesIds(ImmutableSet.of(mimeMessageId2))).collectList().block()) + .isEqualTo(ImmutableList.of(Pair.of(hashSubject(message2BaseSubject), threadId2))); } @Test void selectWithUnrelatedMimeMessageIDsShouldReturnEmpty() { Optional<Subject> message1BaseSubject = Optional.of(new Subject("subject")); - testee.insertSome(ALICE, ImmutableSet.of(mimeMessageId1, mimeMessageId2), messageId1, threadId1, message1BaseSubject).collectList().block(); + testee.insertSome(ALICE, hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1, mimeMessageId2)), messageId1, threadId1, hashSubject(message1BaseSubject)) + .collectList().block(); - assertThat(testee.selectSome(ALICE, ImmutableSet.of(mimeMessageId3, mimeMessageId4, mimeMessageId5)).collectList().block()) + assertThat(testee.selectSome(ALICE, hashMimeMessagesIds(ImmutableSet.of(mimeMessageId3, mimeMessageId4, mimeMessageId5))).collectList().block()) .isEqualTo(ImmutableList.of()); } @Test void deletedEntriesShouldNotBeReturned() { Optional<Subject> message1BaseSubject = Optional.of(new Subject("subject")); - testee.insertSome(ALICE, ImmutableSet.of(mimeMessageId1, mimeMessageId2), messageId1, threadId1, message1BaseSubject).collectList().block(); + testee.insertSome(ALICE, hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1, mimeMessageId2)), messageId1, threadId1, hashSubject(message1BaseSubject)) + .collectList().block(); - testee.deleteSome(ALICE, ImmutableSet.of(mimeMessageId1, mimeMessageId2)); + testee.deleteSome(ALICE, hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1, mimeMessageId2))); - assertThat(testee.selectSome(ALICE, ImmutableSet.of(mimeMessageId1, mimeMessageId2)).collectList().block() + assertThat(testee.selectSome(ALICE, hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1, mimeMessageId2))).collectList().block() .isEmpty()); } @@ -177,14 +201,16 @@ public class CassandraThreadDAOTest { void deleteWithUnrelatedMimeMessageIDsShouldDeleteNothing() { // insert message1 data Optional<Subject> message1BaseSubject = Optional.of(new Subject("subject")); - testee.insertSome(ALICE, ImmutableSet.of(mimeMessageId1, mimeMessageId2), messageId1, threadId1, message1BaseSubject).collectList().block(); + testee.insertSome(ALICE, hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1, mimeMessageId2)), messageId1, threadId1, hashSubject(message1BaseSubject)) + .collectList().block(); // delete with unrelated mimemessageIds - testee.deleteSome(ALICE, ImmutableSet.of(mimeMessageId3, mimeMessageId4, mimeMessageId5)); + testee.deleteSome(ALICE, hashMimeMessagesIds(ImmutableSet.of(mimeMessageId3, mimeMessageId4, mimeMessageId5))) + .collectList().block(); // alice's data should remain - assertThat(testee.selectSome(ALICE, ImmutableSet.of(mimeMessageId1, mimeMessageId2)).collectList().block()) - .isEqualTo(ImmutableList.of(Pair.of(message1BaseSubject, threadId1))); + assertThat(testee.selectSome(ALICE, hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1, mimeMessageId2))).collectList().block()) + .isEqualTo(ImmutableList.of(Pair.of(hashSubject(message1BaseSubject), threadId1))); } } diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAOTest.java index de4b6e01ea..80f4059e8b 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAOTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAOTest.java @@ -19,6 +19,7 @@ package org.apache.james.mailbox.cassandra.mail; +import static org.apache.james.mailbox.cassandra.mail.CassandraThreadDAOTest.hashMimeMessagesIds; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import java.util.Set; @@ -33,7 +34,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -public class CassandraThreadLookupDAOTest { +class CassandraThreadLookupDAOTest { private static final Username ALICE = Username.of("alice"); private static final Username BOB = Username.of("bob"); @@ -64,10 +65,10 @@ public class CassandraThreadLookupDAOTest { @Test void insertShouldSuccess() { - testee.insert(messageId1, ALICE, Set.of(mimeMessageId1, mimeMessageId2)).block(); + testee.insert(messageId1, ALICE, hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))).block(); assertThat(testee.selectOneRow(messageId1).block()) - .isEqualTo(new ThreadTablePartitionKey(ALICE, Set.of(mimeMessageId1, mimeMessageId2))); + .isEqualTo(new ThreadTablePartitionKey(ALICE, hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2)))); } @Test @@ -78,16 +79,16 @@ public class CassandraThreadLookupDAOTest { @Test void selectShouldReturnOnlyRelatedDataByThatMessageId() { - testee.insert(messageId1, ALICE, Set.of(mimeMessageId1, mimeMessageId2)).block(); - testee.insert(messageId2, BOB, Set.of(mimeMessageId3, mimeMessageId4)).block(); + testee.insert(messageId1, ALICE, hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))).block(); + testee.insert(messageId2, BOB, hashMimeMessagesIds(Set.of(mimeMessageId3, mimeMessageId4))).block(); assertThat(testee.selectOneRow(messageId1).block()) - .isEqualTo(new ThreadTablePartitionKey(ALICE, Set.of(mimeMessageId1, mimeMessageId2))); + .isEqualTo(new ThreadTablePartitionKey(ALICE, hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2)))); } @Test void deletedEntriesShouldNotBeReturned() { - testee.insert(messageId1, ALICE, Set.of(mimeMessageId1, mimeMessageId2)).block(); + testee.insert(messageId1, ALICE, hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))).block(); testee.deleteOneRow(messageId1).block(); @@ -97,13 +98,13 @@ public class CassandraThreadLookupDAOTest { @Test void deleteByNonExistMessageIdShouldDeleteNothing() { - testee.insert(messageId1, ALICE, Set.of(mimeMessageId1, mimeMessageId2)).block(); + testee.insert(messageId1, ALICE, hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))).block(); testee.deleteOneRow(messageId2).block(); // message1's data should remain assertThat(testee.selectOneRow(messageId1).block()) - .isEqualTo(new ThreadTablePartitionKey(ALICE, Set.of(mimeMessageId1, mimeMessageId2))); + .isEqualTo(new ThreadTablePartitionKey(ALICE, hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2)))); } } diff --git a/upgrade-instructions.md b/upgrade-instructions.md index 28f665eab4..ec868bc67f 100644 --- a/upgrade-instructions.md +++ b/upgrade-instructions.md @@ -21,6 +21,18 @@ Change list: - [Jmap uploads](#jmap-uploads) - [Mutualize quota table](#mutualize-quota-table) - [Drop Legacy Cassandra migrations](#drop-legacy-cassandra-migrations) + - [Improve CassandraThreadIdGuessingAlgorithm](#improve-cassandrathreadidguessingalgorithm) + +### Improve CassandraThreadIdGuessingAlgorithm +Date: 14/09/2023 + +JIRA: https://issues.apache.org/jira/browse/JAMES-3937 + +Concerned products: Distributed James, Cassandra James Server + +Following the CassandraThreadIdGuessingAlgorithm improvement, `threadTable` and `threadLookupTable` Cassandra tables can be deleted. + +Note that this will cause a discontinuity in thread allocation: 2 threads instead of one. This seems acceptable and preferable to a complex migration in our eyes. ### Drop Legacy Cassandra migrations --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org