This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch 3.7.x in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 944e9a9970143247b72b15bd4aed9e0d3114abaa Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Wed May 3 11:25:28 2023 +0700 JAMES-3924 Test and fix browse start updates -> Propose unit tests for browseStart updates using Cassandra session instrumentation (Ease testing a behaviour not visible at the API level) -> The update limitation to not occur more than once er slice was not correctly applied -> Fix a couple of warnings --- .../cassandra/CassandraMailQueueMailDelete.java | 13 +++++----- .../queue/rabbitmq/RabbitMQMailQueueTest.java | 29 +++++++++++++++++++++- 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java index 1253dc11d4..b5ef41f2c9 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java @@ -93,12 +93,13 @@ public class CassandraMailQueueMailDelete { } private Mono<Instant> findNewBrowseStart(MailQueueName mailQueueName) { - Slice currentSlice = Slice.of(clock.instant()); + Instant now= clock.instant(); return browseStartDao.findBrowseStart(mailQueueName) - .filter(browseStart -> browseStart.isBefore(currentSlice.getStartSliceInstant())) - .flatMapMany(browseStart -> cassandraMailQueueBrowser.browseReferences(mailQueueName, browseStart)) - .map(enqueuedItem -> enqueuedItem.getSlicingContext().getTimeRangeStart()) - .next(); + .filter(browseStart -> browseStart.isBefore(now.minus(configuration.getSliceWindow()))) + .flatMap(browseStart -> cassandraMailQueueBrowser.browseReferences(mailQueueName, browseStart) + .map(enqueuedItem -> enqueuedItem.getSlicingContext().getTimeRangeStart()) + .next() + .filter(newBrowseStart -> newBrowseStart.isAfter(browseStart))); } private Mono<Void> updateNewBrowseStart(MailQueueName mailQueueName, Instant newBrowseStartInstant) { @@ -128,6 +129,6 @@ public class CassandraMailQueueMailDelete { private boolean shouldUpdateBrowseStart() { int threshold = configuration.getUpdateBrowseStartPace(); - return Math.abs(ThreadLocalRandom.current().nextInt()) % threshold == 0; + return ThreadLocalRandom.current().nextInt(threshold) % threshold == 0; } } diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index 8fa34fa0d2..d466ac69d9 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -24,6 +24,7 @@ import static java.time.temporal.ChronoUnit.HOURS; import static org.apache.james.backends.cassandra.Scenario.Builder.executeNormally; import static org.apache.james.backends.cassandra.Scenario.Builder.fail; import static org.apache.james.backends.cassandra.Scenario.Builder.returnEmpty; +import static org.apache.james.backends.cassandra.StatementRecorder.Selector.preparedStatementStartingWith; import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY; import static org.apache.james.queue.api.Mails.defaultMail; import static org.apache.james.queue.api.Mails.defaultMailNoRecipient; @@ -51,6 +52,7 @@ import java.util.stream.Stream; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraClusterExtension; +import org.apache.james.backends.cassandra.StatementRecorder; import org.apache.james.backends.cassandra.components.CassandraModule; import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule; import org.apache.james.backends.rabbitmq.RabbitMQExtension; @@ -87,6 +89,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.mockito.ArgumentCaptor; @@ -104,7 +107,7 @@ import reactor.rabbitmq.Sender; class RabbitMQMailQueueTest { private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory(); private static final int THREE_BUCKET_COUNT = 3; - private static final int UPDATE_BROWSE_START_PACE = 2; + private static final int UPDATE_BROWSE_START_PACE = 10; private static final Duration ONE_HOUR_SLICE_WINDOW = Duration.ofHours(1); private static final org.apache.james.queue.api.MailQueueName SPOOL = org.apache.james.queue.api.MailQueueName.of("spool"); private static final Instant IN_SLICE_1 = Instant.parse("2007-12-03T10:15:30.00Z"); @@ -193,6 +196,30 @@ class RabbitMQMailQueueTest { "5-1", "5-2", "5-3", "5-4", "5-5"); } + @Test + void browseStartShouldBeUpdated(CassandraCluster cassandraCluster) { + int emailCount = 100; + + StatementRecorder statementRecorder = new StatementRecorder(); + cassandraCluster.getConf().recordStatements(statementRecorder); + + clock.setInstant(IN_SLICE_1); + enqueueSomeMails(namePatternForSlice(1), emailCount); + dequeueMails(emailCount); + + clock.setInstant(IN_SLICE_2); + enqueueSomeMails(namePatternForSlice(2), emailCount); + dequeueMails(emailCount); + + clock.setInstant(IN_SLICE_3); + enqueueSomeMails(namePatternForSlice(3), emailCount); + dequeueMails(emailCount); + + // The actual rate of update should actually be lower than the update probability. + assertThat(statementRecorder.listExecutedStatements(preparedStatementStartingWith("UPDATE browsestart"))) + .hasSizeBetween(2, 5); + } + @Test void dequeueShouldDeleteBlobs(CassandraCluster cassandra) throws Exception { String name1 = "myMail1"; --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org