This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 2763b2a05c1061a851311f81019f131f81617d67 Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Fri Jun 30 21:51:29 2023 +0700 JAMES-3924 Review CassandraMailQueueBrowser parallelism --- .../rabbitmq/view/cassandra/CassandraMailQueueBrowser.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java index 64f85be0b1..9c1241351d 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java @@ -134,7 +134,7 @@ public class CassandraMailQueueBrowser { Flux<EnqueuedItemWithSlicingContext> browseReferences(MailQueueName queueName, Instant browseStart) { return allSlicesStartingAt(browseStart) - .flatMapSequential(slice -> browseSlice(queueName, slice), 4); + .concatMap(slice -> browseSlice(queueName, slice)); } private Mono<Pair<EnqueuedItem, Mail>> toMailFuture(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) { @@ -156,15 +156,14 @@ public class CassandraMailQueueBrowser { } private Flux<EnqueuedItemWithSlicingContext> browseSlice(MailQueueName queueName, Slice slice) { - return - allBucketIds() - .concatMap(bucketId -> browseBucket(queueName, slice, bucketId), DEFAULT_CONCURRENCY) - .sort(Comparator.comparing(enqueuedMail -> enqueuedMail.getEnqueuedItem().getEnqueuedTime())); + return allBucketIds() + .concatMap(bucketId -> browseBucket(queueName, slice, bucketId), 4) + .sort(Comparator.comparing(enqueuedMail -> enqueuedMail.getEnqueuedItem().getEnqueuedTime())); } private Flux<EnqueuedItemWithSlicingContext> browseBucket(MailQueueName queueName, Slice slice, BucketId bucketId) { return enqueuedMailsDao.selectEnqueuedMails(queueName, slice, bucketId) - .filterWhen(mailReference -> deletedMailsDao.isStillEnqueued(queueName, mailReference.getEnqueuedItem().getEnqueueId()), DEFAULT_CONCURRENCY); + .filterWhen(mailReference -> deletedMailsDao.isStillEnqueued(queueName, mailReference.getEnqueuedItem().getEnqueueId()), 4); } private Flux<Slice> allSlicesStartingAt(Instant browseStart) { --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org