This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch 3.7.x
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 944e9a9970143247b72b15bd4aed9e0d3114abaa
Author: Benoit Tellier <btell...@linagora.com>
AuthorDate: Wed May 3 11:25:28 2023 +0700

    JAMES-3924 Test and fix browse start updates
    
     -> Propose unit tests for browseStart updates using Cassandra session 
instrumentation
       (Ease testing a behaviour not visible at the API level)
     -> The update limitation to not occur more than once er slice was not 
correctly applied
     -> Fix a couple of warnings
---
 .../cassandra/CassandraMailQueueMailDelete.java    | 13 +++++-----
 .../queue/rabbitmq/RabbitMQMailQueueTest.java      | 29 +++++++++++++++++++++-
 2 files changed, 35 insertions(+), 7 deletions(-)

diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
index 1253dc11d4..b5ef41f2c9 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
@@ -93,12 +93,13 @@ public class CassandraMailQueueMailDelete {
     }
 
     private Mono<Instant> findNewBrowseStart(MailQueueName mailQueueName) {
-        Slice currentSlice = Slice.of(clock.instant());
+        Instant now= clock.instant();
         return browseStartDao.findBrowseStart(mailQueueName)
-            .filter(browseStart -> 
browseStart.isBefore(currentSlice.getStartSliceInstant()))
-            .flatMapMany(browseStart -> 
cassandraMailQueueBrowser.browseReferences(mailQueueName, browseStart))
-            .map(enqueuedItem -> 
enqueuedItem.getSlicingContext().getTimeRangeStart())
-            .next();
+            .filter(browseStart -> 
browseStart.isBefore(now.minus(configuration.getSliceWindow())))
+            .flatMap(browseStart -> 
cassandraMailQueueBrowser.browseReferences(mailQueueName, browseStart)
+                .map(enqueuedItem -> 
enqueuedItem.getSlicingContext().getTimeRangeStart())
+                .next()
+                .filter(newBrowseStart -> 
newBrowseStart.isAfter(browseStart)));
     }
 
     private Mono<Void> updateNewBrowseStart(MailQueueName mailQueueName, 
Instant newBrowseStartInstant) {
@@ -128,6 +129,6 @@ public class CassandraMailQueueMailDelete {
 
     private boolean shouldUpdateBrowseStart() {
         int threshold = configuration.getUpdateBrowseStartPace();
-        return Math.abs(ThreadLocalRandom.current().nextInt()) % threshold == 
0;
+        return ThreadLocalRandom.current().nextInt(threshold) % threshold == 0;
     }
 }
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index 8fa34fa0d2..d466ac69d9 100644
--- 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -24,6 +24,7 @@ import static java.time.temporal.ChronoUnit.HOURS;
 import static 
org.apache.james.backends.cassandra.Scenario.Builder.executeNormally;
 import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
 import static org.apache.james.backends.cassandra.Scenario.Builder.returnEmpty;
+import static 
org.apache.james.backends.cassandra.StatementRecorder.Selector.preparedStatementStartingWith;
 import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
 import static org.apache.james.queue.api.Mails.defaultMail;
 import static org.apache.james.queue.api.Mails.defaultMailNoRecipient;
@@ -51,6 +52,7 @@ import java.util.stream.Stream;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.StatementRecorder;
 import org.apache.james.backends.cassandra.components.CassandraModule;
 import 
org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.backends.rabbitmq.RabbitMQExtension;
@@ -87,6 +89,7 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.mockito.ArgumentCaptor;
@@ -104,7 +107,7 @@ import reactor.rabbitmq.Sender;
 class RabbitMQMailQueueTest {
     private static final HashBlobId.Factory BLOB_ID_FACTORY = new 
HashBlobId.Factory();
     private static final int THREE_BUCKET_COUNT = 3;
-    private static final int UPDATE_BROWSE_START_PACE = 2;
+    private static final int UPDATE_BROWSE_START_PACE = 10;
     private static final Duration ONE_HOUR_SLICE_WINDOW = Duration.ofHours(1);
     private static final org.apache.james.queue.api.MailQueueName SPOOL = 
org.apache.james.queue.api.MailQueueName.of("spool");
     private static final Instant IN_SLICE_1 = 
Instant.parse("2007-12-03T10:15:30.00Z");
@@ -193,6 +196,30 @@ class RabbitMQMailQueueTest {
                 "5-1", "5-2", "5-3", "5-4", "5-5");
         }
 
+        @Test
+        void browseStartShouldBeUpdated(CassandraCluster cassandraCluster) {
+            int emailCount = 100;
+
+            StatementRecorder statementRecorder = new StatementRecorder();
+            cassandraCluster.getConf().recordStatements(statementRecorder);
+
+            clock.setInstant(IN_SLICE_1);
+            enqueueSomeMails(namePatternForSlice(1), emailCount);
+            dequeueMails(emailCount);
+
+            clock.setInstant(IN_SLICE_2);
+            enqueueSomeMails(namePatternForSlice(2), emailCount);
+            dequeueMails(emailCount);
+
+            clock.setInstant(IN_SLICE_3);
+            enqueueSomeMails(namePatternForSlice(3), emailCount);
+            dequeueMails(emailCount);
+
+            // The actual rate of update should actually be lower than the 
update probability.
+            
assertThat(statementRecorder.listExecutedStatements(preparedStatementStartingWith("UPDATE
 browsestart")))
+                .hasSizeBetween(2, 5);
+        }
+
         @Test
         void dequeueShouldDeleteBlobs(CassandraCluster cassandra) throws 
Exception {
             String name1 = "myMail1";


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to