This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new b56acecd7d JAMES-3937 Use hash for CassandraThreadIdGuessingAlgorithm 
(#1727)
b56acecd7d is described below

commit b56acecd7d6f8cd09e14a31315e3285af18a50fc
Author: Trần Hồng Quân <55171818+quantranhong1...@users.noreply.github.com>
AuthorDate: Fri Sep 15 15:43:00 2023 +0700

    JAMES-3937 Use hash for CassandraThreadIdGuessingAlgorithm (#1727)
---
 .../CassandraThreadIdGuessingAlgorithm.java        | 21 ++++--
 .../mailbox/cassandra/mail/CassandraThreadDAO.java | 26 +++----
 .../cassandra/mail/CassandraThreadLookupDAO.java   | 13 ++--
 .../cassandra/mail/ThreadTablePartitionKey.java    |  7 +-
 .../cassandra/modules/CassandraThreadModule.java   |  7 +-
 .../table/CassandraThreadLookupTable.java          |  2 +-
 .../cassandra/table/CassandraThreadTable.java      |  2 +-
 .../cassandra/CassandraMailboxManagerTest.java     |  6 +-
 .../CassandraThreadIdGuessingAlgorithmTest.java    |  6 +-
 .../cassandra/mail/CassandraThreadDAOTest.java     | 88 ++++++++++++++--------
 .../mail/CassandraThreadLookupDAOTest.java         | 19 ++---
 upgrade-instructions.md                            | 12 +++
 12 files changed, 128 insertions(+), 81 deletions(-)

diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithm.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithm.java
index 872fc5edb3..b61fb52e53 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithm.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithm.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import javax.inject.Inject;
 
@@ -41,6 +42,8 @@ import 
org.apache.james.mailbox.store.mail.model.MimeMessageId;
 import org.apache.james.mailbox.store.mail.model.Subject;
 import org.apache.james.mailbox.store.search.SearchUtil;
 
+import com.google.common.hash.Hashing;
+
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -58,16 +61,22 @@ public class CassandraThreadIdGuessingAlgorithm implements 
ThreadIdGuessingAlgor
 
     @Override
     public Mono<ThreadId> guessThreadIdReactive(MessageId messageId, 
Optional<MimeMessageId> mimeMessageId, Optional<MimeMessageId> inReplyTo, 
Optional<List<MimeMessageId>> references, Optional<Subject> subject, 
MailboxSession session) {
-        Set<MimeMessageId> mimeMessageIds = 
buildMimeMessageIdSet(mimeMessageId, inReplyTo, references);
-        Optional<Subject> baseSubject = subject.map(value -> new 
Subject(SearchUtil.getBaseSubject(value.getValue())));
-        return Flux.from(threadDAO.selectSome(session.getUser(), 
mimeMessageIds))
-            .filter(pair -> pair.getLeft().equals(baseSubject))
+        Set<Integer> hashMimeMessageIds = buildMimeMessageIdSet(mimeMessageId, 
inReplyTo, references)
+            .stream()
+            .map(mimeMessageId1 -> 
Hashing.murmur3_32_fixed().hashBytes(mimeMessageId1.getValue().getBytes()).asInt())
+            .collect(Collectors.toSet());
+
+        Optional<Integer> hashBaseSubject = subject.map(value -> new 
Subject(SearchUtil.getBaseSubject(value.getValue())))
+            .map(subject1 -> 
Hashing.murmur3_32_fixed().hashBytes(subject1.getValue().getBytes()).asInt());
+
+        return Flux.from(threadDAO.selectSome(session.getUser(), 
hashMimeMessageIds))
+            .filter(pair -> pair.getLeft().equals(hashBaseSubject))
             .next()
             .map(Pair::getRight)
             .switchIfEmpty(Mono.just(ThreadId.fromBaseMessageId(messageId)))
             .flatMap(threadId -> threadDAO
-                .insertSome(session.getUser(), mimeMessageIds, messageId, 
threadId, baseSubject)
-                .then(threadLookupDAO.insert(messageId, session.getUser(), 
mimeMessageIds))
+                .insertSome(session.getUser(), hashMimeMessageIds, messageId, 
threadId, hashBaseSubject)
+                .then(threadLookupDAO.insert(messageId, session.getUser(), 
hashMimeMessageIds))
                 .then(Mono.just(threadId)));
     }
 
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadDAO.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadDAO.java
index 5af4b990c1..7a9af5a18c 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadDAO.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadDAO.java
@@ -43,8 +43,6 @@ import org.apache.james.core.Username;
 import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.model.ThreadId;
-import org.apache.james.mailbox.store.mail.model.MimeMessageId;
-import org.apache.james.mailbox.store.mail.model.Subject;
 
 import com.datastax.oss.driver.api.core.CqlSession;
 import com.datastax.oss.driver.api.core.cql.PreparedStatement;
@@ -83,35 +81,35 @@ public class CassandraThreadDAO {
             .build());
     }
 
-    public Flux<Void> insertSome(Username username, Set<MimeMessageId> 
mimeMessageIds, MessageId messageId, ThreadId threadId, Optional<Subject> 
baseSubject) {
-        return Flux.fromIterable(mimeMessageIds)
+    public Flux<Void> insertSome(Username username, Set<Integer> 
hashMimeMessageIds, MessageId messageId, ThreadId threadId, Optional<Integer> 
hashBaseSubject) {
+        return Flux.fromIterable(hashMimeMessageIds)
             .flatMap(mimeMessageId -> executor.executeVoid(insertOne.bind()
                 .set(USERNAME, username.asString(), TypeCodecs.TEXT)
-                .set(MIME_MESSAGE_ID, mimeMessageId.getValue(), 
TypeCodecs.TEXT)
+                .set(MIME_MESSAGE_ID, mimeMessageId, TypeCodecs.INT)
                 .set(MESSAGE_ID, ((CassandraMessageId) messageId).get(), 
TypeCodecs.TIMEUUID)
                 .set(THREAD_ID, ((CassandraMessageId) 
threadId.getBaseMessageId()).get(), TypeCodecs.TIMEUUID)
-                .set(BASE_SUBJECT, 
baseSubject.map(Subject::getValue).orElse(null), TypeCodecs.TEXT)), 
DEFAULT_CONCURRENCY);
+                .set(BASE_SUBJECT, hashBaseSubject.orElse(null), 
TypeCodecs.INT)), DEFAULT_CONCURRENCY);
     }
 
-    public Flux<Pair<Optional<Subject>, ThreadId>> selectSome(Username 
username, Set<MimeMessageId> mimeMessageIds) {
-        return Flux.fromIterable(mimeMessageIds)
+    public Flux<Pair<Optional<Integer>, ThreadId>> selectSome(Username 
username, Set<Integer> hashMimeMessageIds) {
+        return Flux.fromIterable(hashMimeMessageIds)
             .flatMap(mimeMessageId -> executor
                 .executeSingleRow(selectOne.bind()
                     .set(USERNAME, username.asString(), TypeCodecs.TEXT)
-                    .set(MIME_MESSAGE_ID, mimeMessageId.getValue(), 
TypeCodecs.TEXT))
+                    .set(MIME_MESSAGE_ID, mimeMessageId, TypeCodecs.INT))
                 .map(this::readRow), DEFAULT_CONCURRENCY)
             .distinct();
     }
 
-    public Flux<Void> deleteSome(Username username, Set<MimeMessageId> 
mimeMessageIds) {
-        return Flux.fromIterable(mimeMessageIds)
+    public Flux<Void> deleteSome(Username username, Set<Integer> 
hashMimeMessageIds) {
+        return Flux.fromIterable(hashMimeMessageIds)
             .flatMap(mimeMessageId -> executor.executeVoid(deleteOne.bind()
                 .set(USERNAME, username.asString(), TypeCodecs.TEXT)
-                .set(MIME_MESSAGE_ID, mimeMessageId.getValue(), 
TypeCodecs.TEXT)));
+                .set(MIME_MESSAGE_ID, mimeMessageId, TypeCodecs.INT)));
     }
 
-    public Pair<Optional<Subject>, ThreadId> readRow(Row row) {
-        return 
Pair.of(Optional.ofNullable(row.getString(BASE_SUBJECT)).map(Subject::new),
+    public Pair<Optional<Integer>, ThreadId> readRow(Row row) {
+        return Pair.of(Optional.ofNullable(row.get(BASE_SUBJECT, 
Integer.class)),
             
ThreadId.fromBaseMessageId(CassandraMessageId.Factory.of(row.getUuid(THREAD_ID))));
     }
 
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAO.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAO.java
index 5913c0622b..e95c7e9c00 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAO.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAO.java
@@ -19,7 +19,7 @@
 
 package org.apache.james.mailbox.cassandra.mail;
 
-import static com.datastax.oss.driver.api.core.type.DataTypes.TEXT;
+import static com.datastax.oss.driver.api.core.type.DataTypes.INT;
 import static com.datastax.oss.driver.api.core.type.DataTypes.frozenSetOf;
 import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
 import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom;
@@ -39,7 +39,6 @@ import 
org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.core.Username;
 import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
 import org.apache.james.mailbox.model.MessageId;
-import org.apache.james.mailbox.store.mail.model.MimeMessageId;
 
 import com.datastax.oss.driver.api.core.CqlSession;
 import com.datastax.oss.driver.api.core.cql.PreparedStatement;
@@ -52,7 +51,7 @@ import com.google.common.collect.ImmutableSet;
 import reactor.core.publisher.Mono;
 
 public class CassandraThreadLookupDAO {
-    private static final TypeCodec<Set<String>> SET_OF_STRINGS_CODEC = 
CodecRegistry.DEFAULT.codecFor(frozenSetOf(TEXT));
+    private static final TypeCodec<Set<Integer>> SET_OF_INTS_CODEC = 
CodecRegistry.DEFAULT.codecFor(frozenSetOf(INT));
 
     private final CassandraAsyncExecutor executor;
     private final PreparedStatement insert;
@@ -79,12 +78,11 @@ public class CassandraThreadLookupDAO {
             .build());
     }
 
-    public Mono<Void> insert(MessageId messageId, Username username, 
Set<MimeMessageId> mimeMessageIds) {
-        Set<String> mimeMessageIdsString = 
mimeMessageIds.stream().map(MimeMessageId::getValue).collect(ImmutableSet.toImmutableSet());
+    public Mono<Void> insert(MessageId messageId, Username username, 
Set<Integer> hashMimeMessageIds) {
         return executor.executeVoid(insert.bind()
             .set(MESSAGE_ID, ((CassandraMessageId) messageId).get(), 
TypeCodecs.TIMEUUID)
             .set(USERNAME, username.asString(), TypeCodecs.TEXT)
-            .set(MIME_MESSAGE_IDS, mimeMessageIdsString, 
SET_OF_STRINGS_CODEC));
+            .set(MIME_MESSAGE_IDS, hashMimeMessageIds, SET_OF_INTS_CODEC));
     }
 
     public Mono<ThreadTablePartitionKey> selectOneRow(MessageId messageId) {
@@ -99,9 +97,8 @@ public class CassandraThreadLookupDAO {
     }
 
     private ThreadTablePartitionKey readRow(Row row) {
-        Set<MimeMessageId> mimeMessageIds = row.get(MIME_MESSAGE_IDS, 
SET_OF_STRINGS_CODEC)
+        Set<Integer> mimeMessageIds = row.get(MIME_MESSAGE_IDS, 
SET_OF_INTS_CODEC)
             .stream()
-            .map(MimeMessageId::new)
             .collect(ImmutableSet.toImmutableSet());
         return new 
ThreadTablePartitionKey(Username.of(row.getString(USERNAME)), mimeMessageIds);
     }
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/ThreadTablePartitionKey.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/ThreadTablePartitionKey.java
index f6913580ec..86da94008b 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/ThreadTablePartitionKey.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/ThreadTablePartitionKey.java
@@ -23,13 +23,12 @@ import java.util.Objects;
 import java.util.Set;
 
 import org.apache.james.core.Username;
-import org.apache.james.mailbox.store.mail.model.MimeMessageId;
 
 public class ThreadTablePartitionKey {
     private final Username username;
-    private final Set<MimeMessageId> mimeMessageIds;
+    private final Set<Integer> mimeMessageIds;
 
-    public ThreadTablePartitionKey(Username username, Set<MimeMessageId> 
mimeMessageIds) {
+    public ThreadTablePartitionKey(Username username, Set<Integer> 
mimeMessageIds) {
         this.username = username;
         this.mimeMessageIds = mimeMessageIds;
     }
@@ -38,7 +37,7 @@ public class ThreadTablePartitionKey {
         return username;
     }
 
-    public Set<MimeMessageId> getMimeMessageIds() {
+    public Set<Integer> getMimeMessageIds() {
         return mimeMessageIds;
     }
 
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraThreadModule.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraThreadModule.java
index b323be9419..efc53457d9 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraThreadModule.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraThreadModule.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.mailbox.cassandra.modules;
 
+import static com.datastax.oss.driver.api.core.type.DataTypes.INT;
 import static com.datastax.oss.driver.api.core.type.DataTypes.TEXT;
 import static com.datastax.oss.driver.api.core.type.DataTypes.TIMEUUID;
 import static com.datastax.oss.driver.api.core.type.DataTypes.frozenSetOf;
@@ -39,16 +40,16 @@ public interface CassandraThreadModule {
         .comment("Related data needed for guessing threadId algorithm")
         .statement(statement -> types -> statement
             .withPartitionKey(USERNAME, TEXT)
-            .withPartitionKey(MIME_MESSAGE_ID, TEXT)
+            .withPartitionKey(MIME_MESSAGE_ID, INT)
             .withClusteringColumn(MESSAGE_ID, TIMEUUID)
             .withColumn(THREAD_ID, TIMEUUID)
-            .withColumn(BASE_SUBJECT, TEXT))
+            .withColumn(BASE_SUBJECT, INT))
         .table(CassandraThreadLookupTable.TABLE_NAME)
         .comment("Thread table lookup by messageId, using for deletion thread 
data")
         .statement(statement -> types -> statement
             .withPartitionKey(MESSAGE_ID, TIMEUUID)
             .withColumn(USERNAME, TEXT)
-            .withColumn(MIME_MESSAGE_IDS, frozenSetOf(TEXT)))
+            .withColumn(MIME_MESSAGE_IDS, frozenSetOf(INT)))
         .build();
 
 }
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraThreadLookupTable.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraThreadLookupTable.java
index 53275ade14..2d8f734161 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraThreadLookupTable.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraThreadLookupTable.java
@@ -22,7 +22,7 @@ package org.apache.james.mailbox.cassandra.table;
 import com.datastax.oss.driver.api.core.CqlIdentifier;
 
 public interface CassandraThreadLookupTable {
-    String TABLE_NAME = "threadLookupTable";
+    String TABLE_NAME = "thread_lookup_2";
 
     CqlIdentifier MIME_MESSAGE_IDS = CqlIdentifier.fromCql("mimeMessageIds");
 }
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraThreadTable.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraThreadTable.java
index b437a9f35c..2b0e7e9a82 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraThreadTable.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraThreadTable.java
@@ -22,7 +22,7 @@ package org.apache.james.mailbox.cassandra.table;
 import com.datastax.oss.driver.api.core.CqlIdentifier;
 
 public interface CassandraThreadTable {
-    String TABLE_NAME = "threadTable";
+    String TABLE_NAME = "thread_2";
 
     CqlIdentifier USERNAME = CqlIdentifier.fromCql("username");
 
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java
index 4b52c7819a..3ce0c57a54 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java
@@ -20,6 +20,8 @@ package org.apache.james.mailbox.cassandra;
 
 import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;
 import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
+import static 
org.apache.james.mailbox.cassandra.mail.CassandraThreadDAOTest.hashMimeMessagesIds;
+import static 
org.apache.james.mailbox.cassandra.mail.CassandraThreadDAOTest.hashSubject;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
 
@@ -842,9 +844,9 @@ public class CassandraMailboxManagerTest extends 
MailboxManagerTest<CassandraMai
 
         private Mono<Void> saveThreadData(Username username, 
Set<MimeMessageId> mimeMessageIds, MessageId messageId, ThreadId threadId, 
Optional<Subject> baseSubject) {
             return threadDAO(cassandra.getCassandraCluster())
-                .insertSome(username, mimeMessageIds, messageId, threadId, 
baseSubject)
+                .insertSome(username, hashMimeMessagesIds(mimeMessageIds), 
messageId, threadId, hashSubject(baseSubject))
                 .then(threadLookupDAO(cassandra.getCassandraCluster())
-                    .insert(messageId, username, mimeMessageIds));
+                    .insert(messageId, username, 
hashMimeMessagesIds(mimeMessageIds)));
         }
 
         private Set<MimeMessageId> 
buildMimeMessageIdSet(Optional<MimeMessageId> mimeMessageId, 
Optional<MimeMessageId> inReplyTo, Optional<List<MimeMessageId>> references) {
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithmTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithmTest.java
index f104dad4be..546657676f 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithmTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraThreadIdGuessingAlgorithmTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.mailbox.cassandra;
 
+import static 
org.apache.james.mailbox.cassandra.mail.CassandraThreadDAOTest.hashMimeMessagesIds;
+import static 
org.apache.james.mailbox.cassandra.mail.CassandraThreadDAOTest.hashSubject;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.util.List;
@@ -93,7 +95,7 @@ public class CassandraThreadIdGuessingAlgorithmTest extends 
ThreadIdGuessingAlgo
 
     @Override
     protected Flux<Void> saveThreadData(Username username, Set<MimeMessageId> 
mimeMessageIds, MessageId messageId, ThreadId threadId, Optional<Subject> 
baseSubject) {
-        return threadDAO.insertSome(username, mimeMessageIds, messageId, 
threadId, baseSubject);
+        return threadDAO.insertSome(username, 
hashMimeMessagesIds(mimeMessageIds), messageId, threadId, 
hashSubject(baseSubject));
     }
 
     @Test
@@ -110,6 +112,6 @@ public class CassandraThreadIdGuessingAlgorithmTest extends 
ThreadIdGuessingAlgo
             Optional.of(List.of(new MimeMessageId("someReferences"), new 
MimeMessageId("Message-ID1"))));
 
         assertThat(threadLookupDAO.selectOneRow(newBasedMessageId).block())
-            .isEqualTo(new ThreadTablePartitionKey(username, mimeMessageIds));
+            .isEqualTo(new ThreadTablePartitionKey(username, 
hashMimeMessagesIds(mimeMessageIds)));
     }
 }
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadDAOTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadDAOTest.java
index 1eee881592..f6bac39978 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadDAOTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadDAOTest.java
@@ -22,6 +22,8 @@ package org.apache.james.mailbox.cassandra.mail;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
 
 import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.cassandra.CassandraCluster;
@@ -38,8 +40,19 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.hash.Hashing;
 
 public class CassandraThreadDAOTest {
+    public static Set<Integer> hashMimeMessagesIds(Set<MimeMessageId> 
mimeMessageIds) {
+        return mimeMessageIds.stream()
+            .map(mimeMessageId -> 
Hashing.murmur3_32_fixed().hashBytes(mimeMessageId.getValue().getBytes()).asInt())
+            .collect(Collectors.toSet());
+    }
+
+    public static Optional<Integer> hashSubject(Optional<Subject> 
baseSubjectOptional) {
+        return baseSubjectOptional.map(baseSubject -> 
Hashing.murmur3_32_fixed().hashBytes(baseSubject.getValue().getBytes()).asInt());
+    }
+
     private static final Username ALICE = Username.of("alice");
     private static final Username BOB = Username.of("bob");
 
@@ -78,35 +91,38 @@ public class CassandraThreadDAOTest {
     }
 
     @Test
-    void insertShouldSuccess() {
+    void insertShouldSucceed() {
         Optional<Subject> message1BaseSubject = Optional.of(new 
Subject("subject"));
-        testee.insertSome(ALICE, ImmutableSet.of(mimeMessageId1), messageId1, 
threadId1, message1BaseSubject).collectList().block();
+        testee.insertSome(ALICE, 
hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1)), messageId1, threadId1, 
hashSubject(message1BaseSubject))
+            .collectList().block();
 
-        assertThat(testee.selectSome(ALICE, 
ImmutableSet.of(mimeMessageId1)).collectList().block())
-            .isEqualTo(ImmutableList.of(Pair.of(message1BaseSubject, 
threadId1)));
+        assertThat(testee.selectSome(ALICE, 
hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1))).collectList().block())
+            
.isEqualTo(ImmutableList.of(Pair.of(hashSubject(message1BaseSubject), 
threadId1)));
     }
 
     @Test
     void insertNullBaseSubjectShouldBeAllowed() {
         Optional<Subject> message1BaseSubject = Optional.empty();
-        testee.insertSome(ALICE, ImmutableSet.of(mimeMessageId1), messageId1, 
threadId1, message1BaseSubject).collectList().block();
+        testee.insertSome(ALICE, 
hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1)), messageId1, threadId1, 
hashSubject(message1BaseSubject))
+            .collectList().block();
 
-        assertThat(testee.selectSome(ALICE, 
ImmutableSet.of(mimeMessageId1)).collectList().block())
+        assertThat(testee.selectSome(ALICE, 
hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1))).collectList().block())
             .isEqualTo(ImmutableList.of(Pair.of(Optional.empty(), threadId1)));
     }
 
     @Test
     void insertEmptyBaseSubjectShouldBeAllowed() {
         Optional<Subject> message1BaseSubject = Optional.of(new Subject(""));
-        testee.insertSome(ALICE, ImmutableSet.of(mimeMessageId1), messageId1, 
threadId1, message1BaseSubject).collectList().block();
+        testee.insertSome(ALICE, 
hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1)), messageId1, threadId1, 
hashSubject(message1BaseSubject))
+            .collectList().block();
 
-        assertThat(testee.selectSome(ALICE, 
ImmutableSet.of(mimeMessageId1)).collectList().block())
-            .isEqualTo(ImmutableList.of(Pair.of(message1BaseSubject, 
threadId1)));
+        assertThat(testee.selectSome(ALICE, 
hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1))).collectList().block())
+            
.isEqualTo(ImmutableList.of(Pair.of(hashSubject(message1BaseSubject), 
threadId1)));
     }
 
     @Test
     void selectShouldReturnEmptyByDefault() {
-        assertThat(testee.selectSome(ALICE, 
ImmutableSet.of(mimeMessageId1)).collectList().block()
+        assertThat(testee.selectSome(ALICE, 
hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1))).collectList().block()
             .isEmpty());
     }
 
@@ -115,61 +131,69 @@ public class CassandraThreadDAOTest {
         Optional<Subject> messageBaseSubject = Optional.of(new 
Subject("subject"));
 
         // given message1 and message2 belongs to same thread, related to each 
other by mimeMessageId2, mimeMessageId3
-        testee.insertSome(ALICE, ImmutableSet.of(mimeMessageId1, 
mimeMessageId2, mimeMessageId3), messageId1, threadId1, 
messageBaseSubject).collectList().block();
-        testee.insertSome(ALICE, ImmutableSet.of(mimeMessageId2, 
mimeMessageId3, mimeMessageId4), messageId2, threadId1, 
messageBaseSubject).collectList().block();
+        testee.insertSome(ALICE, 
hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1, mimeMessageId2, 
mimeMessageId3)), messageId1, threadId1, hashSubject(messageBaseSubject))
+            .collectList().block();
+        testee.insertSome(ALICE, 
hashMimeMessagesIds(ImmutableSet.of(mimeMessageId2, mimeMessageId3, 
mimeMessageId4)), messageId2, threadId1, hashSubject(messageBaseSubject))
+            .collectList().block();
 
         // select with new message having mimeMessageId2 and mimeMessageId3
-        assertThat(testee.selectSome(ALICE, ImmutableSet.of(mimeMessageId2, 
mimeMessageId3)).collectList().block())
-            .isEqualTo(ImmutableList.of(Pair.of(messageBaseSubject, 
threadId1)));
+        assertThat(testee.selectSome(ALICE, 
hashMimeMessagesIds(ImmutableSet.of(mimeMessageId2, 
mimeMessageId3))).collectList().block())
+            
.isEqualTo(ImmutableList.of(Pair.of(hashSubject(messageBaseSubject), 
threadId1)));
     }
 
     @Test
     void selectShouldReturnOnlyRelatedMessageDataOfAUser() {
         // insert message1 data of ALICE
         Optional<Subject> message1BaseSubject = Optional.of(new 
Subject("subject"));
-        testee.insertSome(ALICE, ImmutableSet.of(mimeMessageId1), messageId1, 
threadId1, message1BaseSubject).collectList().block();
+        testee.insertSome(ALICE, 
hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1)), messageId1, threadId1, 
hashSubject(message1BaseSubject))
+            .collectList().block();
 
         // insert message2 data of BOB
         Optional<Subject> message2BaseSubject = Optional.of(new 
Subject("subject2"));
-        testee.insertSome(BOB, ImmutableSet.of(mimeMessageId2), messageId2, 
threadId2, message2BaseSubject).collectList().block();
+        testee.insertSome(BOB, 
hashMimeMessagesIds(ImmutableSet.of(mimeMessageId2)), messageId2, threadId2, 
hashSubject(message2BaseSubject)).collectList().block();
 
         // select some data of BOB
-        assertThat(testee.selectSome(BOB, 
ImmutableSet.of(mimeMessageId2)).collectList().block())
-            .isEqualTo(ImmutableList.of(Pair.of(message2BaseSubject, 
threadId2)));
+        assertThat(testee.selectSome(BOB, 
hashMimeMessagesIds(ImmutableSet.of(mimeMessageId2)))
+            .collectList().block())
+            
.isEqualTo(ImmutableList.of(Pair.of(hashSubject(message2BaseSubject), 
threadId2)));
     }
 
     @Test
     void selectShouldReturnOnlyRelatedMessageDataOfAThread() {
         // insert message1 data of ALICE which in thread1
         Optional<Subject> message1BaseSubject = Optional.of(new 
Subject("subject"));
-        testee.insertSome(ALICE, ImmutableSet.of(mimeMessageId1), messageId1, 
threadId1, message1BaseSubject).collectList().block();
+        testee.insertSome(ALICE, 
hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1)), messageId1, threadId1, 
hashSubject(message1BaseSubject))
+            .collectList().block();
 
         // insert message2 data of ALICE which in thread2
         Optional<Subject> message2BaseSubject = Optional.of(new 
Subject("subject2"));
-        testee.insertSome(ALICE, ImmutableSet.of(mimeMessageId2), messageId2, 
threadId2, message2BaseSubject).collectList().block();
+        testee.insertSome(ALICE, 
hashMimeMessagesIds(ImmutableSet.of(mimeMessageId2)), messageId2, threadId2, 
hashSubject(message2BaseSubject))
+            .collectList().block();
 
         // select some data related to thread2
-        assertThat(testee.selectSome(ALICE, 
ImmutableSet.of(mimeMessageId2)).collectList().block())
-            .isEqualTo(ImmutableList.of(Pair.of(message2BaseSubject, 
threadId2)));
+        assertThat(testee.selectSome(ALICE, 
hashMimeMessagesIds(ImmutableSet.of(mimeMessageId2))).collectList().block())
+            
.isEqualTo(ImmutableList.of(Pair.of(hashSubject(message2BaseSubject), 
threadId2)));
     }
 
     @Test
     void selectWithUnrelatedMimeMessageIDsShouldReturnEmpty() {
         Optional<Subject> message1BaseSubject = Optional.of(new 
Subject("subject"));
-        testee.insertSome(ALICE, ImmutableSet.of(mimeMessageId1, 
mimeMessageId2), messageId1, threadId1, 
message1BaseSubject).collectList().block();
+        testee.insertSome(ALICE, 
hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1, mimeMessageId2)), 
messageId1, threadId1, hashSubject(message1BaseSubject))
+            .collectList().block();
 
-        assertThat(testee.selectSome(ALICE, ImmutableSet.of(mimeMessageId3, 
mimeMessageId4, mimeMessageId5)).collectList().block())
+        assertThat(testee.selectSome(ALICE, 
hashMimeMessagesIds(ImmutableSet.of(mimeMessageId3, mimeMessageId4, 
mimeMessageId5))).collectList().block())
             .isEqualTo(ImmutableList.of());
     }
 
     @Test
     void deletedEntriesShouldNotBeReturned() {
         Optional<Subject> message1BaseSubject = Optional.of(new 
Subject("subject"));
-        testee.insertSome(ALICE, ImmutableSet.of(mimeMessageId1, 
mimeMessageId2), messageId1, threadId1, 
message1BaseSubject).collectList().block();
+        testee.insertSome(ALICE, 
hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1, mimeMessageId2)), 
messageId1, threadId1, hashSubject(message1BaseSubject))
+            .collectList().block();
 
-        testee.deleteSome(ALICE, ImmutableSet.of(mimeMessageId1, 
mimeMessageId2));
+        testee.deleteSome(ALICE, 
hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1, mimeMessageId2)));
 
-        assertThat(testee.selectSome(ALICE, ImmutableSet.of(mimeMessageId1, 
mimeMessageId2)).collectList().block()
+        assertThat(testee.selectSome(ALICE, 
hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1, 
mimeMessageId2))).collectList().block()
             .isEmpty());
     }
 
@@ -177,14 +201,16 @@ public class CassandraThreadDAOTest {
     void deleteWithUnrelatedMimeMessageIDsShouldDeleteNothing() {
         // insert message1 data
         Optional<Subject> message1BaseSubject = Optional.of(new 
Subject("subject"));
-        testee.insertSome(ALICE, ImmutableSet.of(mimeMessageId1, 
mimeMessageId2), messageId1, threadId1, 
message1BaseSubject).collectList().block();
+        testee.insertSome(ALICE, 
hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1, mimeMessageId2)), 
messageId1, threadId1, hashSubject(message1BaseSubject))
+            .collectList().block();
 
         // delete with unrelated mimemessageIds
-        testee.deleteSome(ALICE, ImmutableSet.of(mimeMessageId3, 
mimeMessageId4, mimeMessageId5));
+        testee.deleteSome(ALICE, 
hashMimeMessagesIds(ImmutableSet.of(mimeMessageId3, mimeMessageId4, 
mimeMessageId5)))
+            .collectList().block();
 
         // alice's data should remain
-        assertThat(testee.selectSome(ALICE, ImmutableSet.of(mimeMessageId1, 
mimeMessageId2)).collectList().block())
-            .isEqualTo(ImmutableList.of(Pair.of(message1BaseSubject, 
threadId1)));
+        assertThat(testee.selectSome(ALICE, 
hashMimeMessagesIds(ImmutableSet.of(mimeMessageId1, 
mimeMessageId2))).collectList().block())
+            
.isEqualTo(ImmutableList.of(Pair.of(hashSubject(message1BaseSubject), 
threadId1)));
     }
 
 }
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAOTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAOTest.java
index de4b6e01ea..80f4059e8b 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAOTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraThreadLookupDAOTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.mailbox.cassandra.mail;
 
+import static 
org.apache.james.mailbox.cassandra.mail.CassandraThreadDAOTest.hashMimeMessagesIds;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
 
 import java.util.Set;
@@ -33,7 +34,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
-public class CassandraThreadLookupDAOTest {
+class CassandraThreadLookupDAOTest {
     private static final Username ALICE = Username.of("alice");
     private static final Username BOB = Username.of("bob");
 
@@ -64,10 +65,10 @@ public class CassandraThreadLookupDAOTest {
 
     @Test
     void insertShouldSuccess() {
-        testee.insert(messageId1, ALICE, Set.of(mimeMessageId1, 
mimeMessageId2)).block();
+        testee.insert(messageId1, ALICE, 
hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))).block();
 
         assertThat(testee.selectOneRow(messageId1).block())
-            .isEqualTo(new ThreadTablePartitionKey(ALICE, 
Set.of(mimeMessageId1, mimeMessageId2)));
+            .isEqualTo(new ThreadTablePartitionKey(ALICE, 
hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))));
     }
 
     @Test
@@ -78,16 +79,16 @@ public class CassandraThreadLookupDAOTest {
 
     @Test
     void selectShouldReturnOnlyRelatedDataByThatMessageId() {
-        testee.insert(messageId1, ALICE, Set.of(mimeMessageId1, 
mimeMessageId2)).block();
-        testee.insert(messageId2, BOB, Set.of(mimeMessageId3, 
mimeMessageId4)).block();
+        testee.insert(messageId1, ALICE, 
hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))).block();
+        testee.insert(messageId2, BOB, 
hashMimeMessagesIds(Set.of(mimeMessageId3, mimeMessageId4))).block();
 
         assertThat(testee.selectOneRow(messageId1).block())
-            .isEqualTo(new ThreadTablePartitionKey(ALICE, 
Set.of(mimeMessageId1, mimeMessageId2)));
+            .isEqualTo(new ThreadTablePartitionKey(ALICE, 
hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))));
     }
 
     @Test
     void deletedEntriesShouldNotBeReturned() {
-        testee.insert(messageId1, ALICE, Set.of(mimeMessageId1, 
mimeMessageId2)).block();
+        testee.insert(messageId1, ALICE, 
hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))).block();
 
         testee.deleteOneRow(messageId1).block();
 
@@ -97,13 +98,13 @@ public class CassandraThreadLookupDAOTest {
 
     @Test
     void deleteByNonExistMessageIdShouldDeleteNothing() {
-        testee.insert(messageId1, ALICE, Set.of(mimeMessageId1, 
mimeMessageId2)).block();
+        testee.insert(messageId1, ALICE, 
hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))).block();
 
         testee.deleteOneRow(messageId2).block();
 
         // message1's data should remain
         assertThat(testee.selectOneRow(messageId1).block())
-            .isEqualTo(new ThreadTablePartitionKey(ALICE, 
Set.of(mimeMessageId1, mimeMessageId2)));
+            .isEqualTo(new ThreadTablePartitionKey(ALICE, 
hashMimeMessagesIds(Set.of(mimeMessageId1, mimeMessageId2))));
     }
 
 }
diff --git a/upgrade-instructions.md b/upgrade-instructions.md
index 28f665eab4..ec868bc67f 100644
--- a/upgrade-instructions.md
+++ b/upgrade-instructions.md
@@ -21,6 +21,18 @@ Change list:
  - [Jmap uploads](#jmap-uploads)
  - [Mutualize quota table](#mutualize-quota-table)
  - [Drop Legacy Cassandra migrations](#drop-legacy-cassandra-migrations)
+ - [Improve 
CassandraThreadIdGuessingAlgorithm](#improve-cassandrathreadidguessingalgorithm)
+
+### Improve CassandraThreadIdGuessingAlgorithm
+Date: 14/09/2023
+
+JIRA: https://issues.apache.org/jira/browse/JAMES-3937
+
+Concerned products: Distributed James, Cassandra James Server
+
+Following the CassandraThreadIdGuessingAlgorithm improvement, `threadTable` 
and `threadLookupTable` Cassandra tables can be deleted.
+
+Note that this will cause a discontinuity in thread allocation: 2 threads 
instead of one. This seems acceptable and preferable to a complex migration in 
our eyes.
 
 ### Drop Legacy Cassandra migrations
 


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org


Reply via email to