[james-project] 06/09: JAMES-3924 Review CassandraMailQueueBrowser parallelism
This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch 3.8.x in repository https://gitbox.apache.org/repos/asf/james-project.git commit 660068a3e3da2e0ed604201d76f22ba2d8f18308 Author: Benoit Tellier AuthorDate: Fri Jun 30 21:51:29 2023 +0700 JAMES-3924 Review CassandraMailQueueBrowser parallelism --- .../rabbitmq/view/cassandra/CassandraMailQueueBrowser.java| 11 +-- 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java index 64f85be0b1..9c1241351d 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java @@ -134,7 +134,7 @@ public class CassandraMailQueueBrowser { Flux browseReferences(MailQueueName queueName, Instant browseStart) { return allSlicesStartingAt(browseStart) -.flatMapSequential(slice -> browseSlice(queueName, slice), 4); +.concatMap(slice -> browseSlice(queueName, slice)); } private Mono> toMailFuture(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) { @@ -156,15 +156,14 @@ public class CassandraMailQueueBrowser { } private Flux browseSlice(MailQueueName queueName, Slice slice) { -return -allBucketIds() -.concatMap(bucketId -> browseBucket(queueName, slice, bucketId), DEFAULT_CONCURRENCY) -.sort(Comparator.comparing(enqueuedMail -> enqueuedMail.getEnqueuedItem().getEnqueuedTime())); +return allBucketIds() +.concatMap(bucketId -> browseBucket(queueName, slice, bucketId), 4) +.sort(Comparator.comparing(enqueuedMail -> enqueuedMail.getEnqueuedItem().getEnqueuedTime())); } private Flux browseBucket(MailQueueName queueName, Slice slice, BucketId bucketId) { return enqueuedMailsDao.selectEnqueuedMails(queueName, slice, bucketId) -.filterWhen(mailReference -> deletedMailsDao.isStillEnqueued(queueName, mailReference.getEnqueuedItem().getEnqueueId()), DEFAULT_CONCURRENCY); +.filterWhen(mailReference -> deletedMailsDao.isStillEnqueued(queueName, mailReference.getEnqueuedItem().getEnqueueId()), 4); } private Flux allSlicesStartingAt(Instant browseStart) { - To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org
[james-project] 06/09: JAMES-3924 Review CassandraMailQueueBrowser parallelism
This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git commit 2763b2a05c1061a851311f81019f131f81617d67 Author: Benoit Tellier AuthorDate: Fri Jun 30 21:51:29 2023 +0700 JAMES-3924 Review CassandraMailQueueBrowser parallelism --- .../rabbitmq/view/cassandra/CassandraMailQueueBrowser.java| 11 +-- 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java index 64f85be0b1..9c1241351d 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java @@ -134,7 +134,7 @@ public class CassandraMailQueueBrowser { Flux browseReferences(MailQueueName queueName, Instant browseStart) { return allSlicesStartingAt(browseStart) -.flatMapSequential(slice -> browseSlice(queueName, slice), 4); +.concatMap(slice -> browseSlice(queueName, slice)); } private Mono> toMailFuture(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) { @@ -156,15 +156,14 @@ public class CassandraMailQueueBrowser { } private Flux browseSlice(MailQueueName queueName, Slice slice) { -return -allBucketIds() -.concatMap(bucketId -> browseBucket(queueName, slice, bucketId), DEFAULT_CONCURRENCY) -.sort(Comparator.comparing(enqueuedMail -> enqueuedMail.getEnqueuedItem().getEnqueuedTime())); +return allBucketIds() +.concatMap(bucketId -> browseBucket(queueName, slice, bucketId), 4) +.sort(Comparator.comparing(enqueuedMail -> enqueuedMail.getEnqueuedItem().getEnqueuedTime())); } private Flux browseBucket(MailQueueName queueName, Slice slice, BucketId bucketId) { return enqueuedMailsDao.selectEnqueuedMails(queueName, slice, bucketId) -.filterWhen(mailReference -> deletedMailsDao.isStillEnqueued(queueName, mailReference.getEnqueuedItem().getEnqueueId()), DEFAULT_CONCURRENCY); +.filterWhen(mailReference -> deletedMailsDao.isStillEnqueued(queueName, mailReference.getEnqueuedItem().getEnqueueId()), 4); } private Flux allSlicesStartingAt(Instant browseStart) { - To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org