[james-project] 06/09: JAMES-3924 Review CassandraMailQueueBrowser parallelism

2023-08-29 Thread btellier
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch 3.8.x
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 660068a3e3da2e0ed604201d76f22ba2d8f18308
Author: Benoit Tellier 
AuthorDate: Fri Jun 30 21:51:29 2023 +0700

JAMES-3924 Review CassandraMailQueueBrowser parallelism
---
 .../rabbitmq/view/cassandra/CassandraMailQueueBrowser.java| 11 +--
 1 file changed, 5 insertions(+), 6 deletions(-)

diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
index 64f85be0b1..9c1241351d 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
@@ -134,7 +134,7 @@ public class CassandraMailQueueBrowser {
 
 Flux browseReferences(MailQueueName 
queueName, Instant browseStart) {
 return allSlicesStartingAt(browseStart)
-.flatMapSequential(slice -> browseSlice(queueName, slice), 4);
+.concatMap(slice -> browseSlice(queueName, slice));
 }
 
 private Mono> 
toMailFuture(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) {
@@ -156,15 +156,14 @@ public class CassandraMailQueueBrowser {
 }
 
 private Flux browseSlice(MailQueueName 
queueName, Slice slice) {
-return
-allBucketIds()
-.concatMap(bucketId -> browseBucket(queueName, slice, 
bucketId), DEFAULT_CONCURRENCY)
-.sort(Comparator.comparing(enqueuedMail -> 
enqueuedMail.getEnqueuedItem().getEnqueuedTime()));
+return allBucketIds()
+.concatMap(bucketId -> browseBucket(queueName, slice, bucketId), 4)
+.sort(Comparator.comparing(enqueuedMail -> 
enqueuedMail.getEnqueuedItem().getEnqueuedTime()));
 }
 
 private Flux browseBucket(MailQueueName 
queueName, Slice slice, BucketId bucketId) {
 return enqueuedMailsDao.selectEnqueuedMails(queueName, slice, bucketId)
-.filterWhen(mailReference -> 
deletedMailsDao.isStillEnqueued(queueName, 
mailReference.getEnqueuedItem().getEnqueueId()), DEFAULT_CONCURRENCY);
+.filterWhen(mailReference -> 
deletedMailsDao.isStillEnqueued(queueName, 
mailReference.getEnqueuedItem().getEnqueueId()), 4);
 }
 
 private Flux allSlicesStartingAt(Instant browseStart) {


-
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org



[james-project] 06/09: JAMES-3924 Review CassandraMailQueueBrowser parallelism

2023-08-26 Thread btellier
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 2763b2a05c1061a851311f81019f131f81617d67
Author: Benoit Tellier 
AuthorDate: Fri Jun 30 21:51:29 2023 +0700

JAMES-3924 Review CassandraMailQueueBrowser parallelism
---
 .../rabbitmq/view/cassandra/CassandraMailQueueBrowser.java| 11 +--
 1 file changed, 5 insertions(+), 6 deletions(-)

diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
index 64f85be0b1..9c1241351d 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
@@ -134,7 +134,7 @@ public class CassandraMailQueueBrowser {
 
 Flux browseReferences(MailQueueName 
queueName, Instant browseStart) {
 return allSlicesStartingAt(browseStart)
-.flatMapSequential(slice -> browseSlice(queueName, slice), 4);
+.concatMap(slice -> browseSlice(queueName, slice));
 }
 
 private Mono> 
toMailFuture(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) {
@@ -156,15 +156,14 @@ public class CassandraMailQueueBrowser {
 }
 
 private Flux browseSlice(MailQueueName 
queueName, Slice slice) {
-return
-allBucketIds()
-.concatMap(bucketId -> browseBucket(queueName, slice, 
bucketId), DEFAULT_CONCURRENCY)
-.sort(Comparator.comparing(enqueuedMail -> 
enqueuedMail.getEnqueuedItem().getEnqueuedTime()));
+return allBucketIds()
+.concatMap(bucketId -> browseBucket(queueName, slice, bucketId), 4)
+.sort(Comparator.comparing(enqueuedMail -> 
enqueuedMail.getEnqueuedItem().getEnqueuedTime()));
 }
 
 private Flux browseBucket(MailQueueName 
queueName, Slice slice, BucketId bucketId) {
 return enqueuedMailsDao.selectEnqueuedMails(queueName, slice, bucketId)
-.filterWhen(mailReference -> 
deletedMailsDao.isStillEnqueued(queueName, 
mailReference.getEnqueuedItem().getEnqueueId()), DEFAULT_CONCURRENCY);
+.filterWhen(mailReference -> 
deletedMailsDao.isStillEnqueued(queueName, 
mailReference.getEnqueuedItem().getEnqueueId()), 4);
 }
 
 private Flux allSlicesStartingAt(Instant browseStart) {


-
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org