This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 78da94b679bf327dd0272ba5b7dae7441a520057
Author: Rene Cordier <rcord...@linagora.com>
AuthorDate: Fri Mar 1 11:04:27 2019 +0700

    MAILBOX-382 fix concurrency test issue for redelivery of multiple events
---
 .../mailbox/events/MemoryEventDeadLetters.java     | 45 ++++++++++++++--------
 .../service/EventDeadLettersRedeliverTask.java     |  7 ++--
 .../webadmin/service/EventDeadLettersService.java  |  9 +++--
 3 files changed, 37 insertions(+), 24 deletions(-)

diff --git 
a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
 
b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
index fcddc5e..fdeec7c 100644
--- 
a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
+++ 
b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
@@ -21,8 +21,9 @@ package org.apache.james.mailbox.events;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -33,7 +34,7 @@ public class MemoryEventDeadLetters implements 
EventDeadLetters {
     private final Multimap<Group, Event> deadLetters;
 
     public MemoryEventDeadLetters() {
-        this.deadLetters = 
Multimaps.synchronizedSetMultimap(HashMultimap.create());
+        this.deadLetters = HashMultimap.create();
     }
 
     @Override
@@ -41,9 +42,11 @@ public class MemoryEventDeadLetters implements 
EventDeadLetters {
         Preconditions.checkArgument(registeredGroup != null, 
REGISTERED_GROUP_CANNOT_BE_NULL);
         Preconditions.checkArgument(failDeliveredEvent != null, 
FAIL_DELIVERED_EVENT_CANNOT_BE_NULL);
 
-        return Mono.fromRunnable(() -> deadLetters.put(registeredGroup, 
failDeliveredEvent))
-            .subscribeWith(MonoProcessor.create())
-            .then();
+        synchronized (deadLetters) {
+            return Mono.fromRunnable(() -> deadLetters.put(registeredGroup, 
failDeliveredEvent))
+                .subscribeWith(MonoProcessor.create())
+                .then();
+        }
     }
 
     @Override
@@ -51,12 +54,14 @@ public class MemoryEventDeadLetters implements 
EventDeadLetters {
         Preconditions.checkArgument(registeredGroup != null, 
REGISTERED_GROUP_CANNOT_BE_NULL);
         Preconditions.checkArgument(failDeliveredEventId != null, 
FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL);
 
-        return Flux.fromIterable(deadLetters.get(registeredGroup))
-            .filter(event -> event.getEventId().equals(failDeliveredEventId))
-            .next()
-            .doOnNext(event -> deadLetters.remove(registeredGroup, event))
-            .subscribeWith(MonoProcessor.create())
-            .then();
+        synchronized (deadLetters) {
+            return 
Flux.fromIterable(ImmutableList.copyOf(deadLetters.get(registeredGroup)))
+                .filter(event -> 
event.getEventId().equals(failDeliveredEventId))
+                .next()
+                .doOnNext(event -> deadLetters.remove(registeredGroup, event))
+                .subscribeWith(MonoProcessor.create())
+                .then();
+        }
     }
 
     @Override
@@ -64,21 +69,27 @@ public class MemoryEventDeadLetters implements 
EventDeadLetters {
         Preconditions.checkArgument(registeredGroup != null, 
REGISTERED_GROUP_CANNOT_BE_NULL);
         Preconditions.checkArgument(failDeliveredEventId != null, 
FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL);
 
-        return Flux.fromIterable(deadLetters.get(registeredGroup))
-            .filter(event -> event.getEventId().equals(failDeliveredEventId))
-            .next();
+        synchronized (deadLetters) {
+            return 
Flux.fromIterable(ImmutableList.copyOf(deadLetters.get(registeredGroup)))
+                .filter(event -> 
event.getEventId().equals(failDeliveredEventId))
+                .next();
+        }
     }
 
     @Override
     public Flux<Event.EventId> failedEventIds(Group registeredGroup) {
         Preconditions.checkArgument(registeredGroup != null, 
REGISTERED_GROUP_CANNOT_BE_NULL);
 
-        return Flux.fromIterable(deadLetters.get(registeredGroup))
-            .map(Event::getEventId);
+        synchronized (deadLetters) {
+            return 
Flux.fromIterable(ImmutableList.copyOf(deadLetters.get(registeredGroup)))
+                .map(Event::getEventId);
+        }
     }
 
     @Override
     public Flux<Group> groupsWithFailedEvents() {
-        return Flux.fromIterable(deadLetters.keySet());
+        synchronized (deadLetters) {
+            return 
Flux.fromIterable(ImmutableSet.copyOf(deadLetters.keySet()));
+        }
     }
 }
diff --git 
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java
 
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java
index 7e8c456..ea5553c 100644
--- 
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java
+++ 
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java
@@ -21,6 +21,7 @@ package org.apache.james.webadmin.service;
 
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 
 import javax.inject.Inject;
 
@@ -61,12 +62,12 @@ public class EventDeadLettersRedeliverTask implements Task {
 
     private final EventBus eventBus;
     private final EventDeadLetters deadLetters;
-    private final Flux<Tuple2<Group, Event>> groupsWithEvents;
+    private final Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents;
     private final AtomicLong successfulRedeliveriesCount;
     private final AtomicLong failedRedeliveriesCount;
 
     @Inject
-    EventDeadLettersRedeliverTask(EventBus eventBus, EventDeadLetters 
deadLetters, Flux<Tuple2<Group, Event>> groupsWithEvents) {
+    EventDeadLettersRedeliverTask(EventBus eventBus, EventDeadLetters 
deadLetters, Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents) {
         this.eventBus = eventBus;
         this.deadLetters = deadLetters;
         this.groupsWithEvents = groupsWithEvents;
@@ -76,7 +77,7 @@ public class EventDeadLettersRedeliverTask implements Task {
 
     @Override
     public Result run() {
-        return groupsWithEvents.flatMap(entry -> 
redeliverGroupEvent(entry.getT1(), entry.getT2()))
+        return groupsWithEvents.get().flatMap(entry -> 
redeliverGroupEvent(entry.getT1(), entry.getT2()))
             .reduce(Result.COMPLETED, Task::combine)
             .onErrorResume(e -> {
                 LOGGER.error("Error while redelivering events", e);
diff --git 
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java
 
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java
index ce2c915..e9ea848 100644
--- 
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java
+++ 
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java
@@ -21,6 +21,7 @@ package org.apache.james.webadmin.service;
 
 import java.util.List;
 import java.util.UUID;
+import java.util.function.Supplier;
 
 import javax.inject.Inject;
 
@@ -97,22 +98,22 @@ public class EventDeadLettersService {
     }
 
     public Task createActionOnEventsTask() {
-        Flux<Tuple2<Group, Event>> groupsWithEvents = 
listGroups().flatMap(this::getGroupWithEvents);
+        Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents = () -> 
listGroups().flatMap(this::getGroupWithEvents);
 
         return redeliverEvents(groupsWithEvents);
     }
 
     public Task createActionOnEventsTask(Group group) {
-        return redeliverEvents(getGroupWithEvents(group));
+        return redeliverEvents(() -> getGroupWithEvents(group));
     }
 
     public Task createActionOnEventsTask(Group group, Event.EventId eventId) {
-        Flux<Tuple2<Group, Event>> groupWithEvent = 
Flux.just(group).zipWith(getEvent(group, eventId));
+        Supplier<Flux<Tuple2<Group, Event>>> groupWithEvent = () -> 
Flux.just(group).zipWith(getEvent(group, eventId));
 
         return redeliverEvents(groupWithEvent);
     }
 
-    private Task redeliverEvents(Flux<Tuple2<Group, Event>> groupsWithEvents) {
+    private Task redeliverEvents(Supplier<Flux<Tuple2<Group, Event>>> 
groupsWithEvents) {
         return new EventDeadLettersRedeliverTask(eventBus, deadLetters, 
groupsWithEvents);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to