This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 2763b2a05c1061a851311f81019f131f81617d67
Author: Benoit Tellier <btell...@linagora.com>
AuthorDate: Fri Jun 30 21:51:29 2023 +0700

    JAMES-3924 Review CassandraMailQueueBrowser parallelism
---
 .../rabbitmq/view/cassandra/CassandraMailQueueBrowser.java    | 11 +++++------
 1 file changed, 5 insertions(+), 6 deletions(-)

diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
index 64f85be0b1..9c1241351d 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
@@ -134,7 +134,7 @@ public class CassandraMailQueueBrowser {
 
     Flux<EnqueuedItemWithSlicingContext> browseReferences(MailQueueName 
queueName, Instant browseStart) {
         return allSlicesStartingAt(browseStart)
-            .flatMapSequential(slice -> browseSlice(queueName, slice), 4);
+            .concatMap(slice -> browseSlice(queueName, slice));
     }
 
     private Mono<Pair<EnqueuedItem, Mail>> 
toMailFuture(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) {
@@ -156,15 +156,14 @@ public class CassandraMailQueueBrowser {
     }
 
     private Flux<EnqueuedItemWithSlicingContext> browseSlice(MailQueueName 
queueName, Slice slice) {
-        return
-            allBucketIds()
-                .concatMap(bucketId -> browseBucket(queueName, slice, 
bucketId), DEFAULT_CONCURRENCY)
-                .sort(Comparator.comparing(enqueuedMail -> 
enqueuedMail.getEnqueuedItem().getEnqueuedTime()));
+        return allBucketIds()
+            .concatMap(bucketId -> browseBucket(queueName, slice, bucketId), 4)
+            .sort(Comparator.comparing(enqueuedMail -> 
enqueuedMail.getEnqueuedItem().getEnqueuedTime()));
     }
 
     private Flux<EnqueuedItemWithSlicingContext> browseBucket(MailQueueName 
queueName, Slice slice, BucketId bucketId) {
         return enqueuedMailsDao.selectEnqueuedMails(queueName, slice, bucketId)
-            .filterWhen(mailReference -> 
deletedMailsDao.isStillEnqueued(queueName, 
mailReference.getEnqueuedItem().getEnqueueId()), DEFAULT_CONCURRENCY);
+            .filterWhen(mailReference -> 
deletedMailsDao.isStillEnqueued(queueName, 
mailReference.getEnqueuedItem().getEnqueueId()), 4);
     }
 
     private Flux<Slice> allSlicesStartingAt(Instant browseStart) {


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to