This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 78da94b679bf327dd0272ba5b7dae7441a520057 Author: Rene Cordier <rcord...@linagora.com> AuthorDate: Fri Mar 1 11:04:27 2019 +0700 MAILBOX-382 fix concurrency test issue for redelivery of multiple events --- .../mailbox/events/MemoryEventDeadLetters.java | 45 ++++++++++++++-------- .../service/EventDeadLettersRedeliverTask.java | 7 ++-- .../webadmin/service/EventDeadLettersService.java | 9 +++-- 3 files changed, 37 insertions(+), 24 deletions(-) diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java index fcddc5e..fdeec7c 100644 --- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java +++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java @@ -21,8 +21,9 @@ package org.apache.james.mailbox.events; import com.google.common.base.Preconditions; import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimap; -import com.google.common.collect.Multimaps; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -33,7 +34,7 @@ public class MemoryEventDeadLetters implements EventDeadLetters { private final Multimap<Group, Event> deadLetters; public MemoryEventDeadLetters() { - this.deadLetters = Multimaps.synchronizedSetMultimap(HashMultimap.create()); + this.deadLetters = HashMultimap.create(); } @Override @@ -41,9 +42,11 @@ public class MemoryEventDeadLetters implements EventDeadLetters { Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL); Preconditions.checkArgument(failDeliveredEvent != null, FAIL_DELIVERED_EVENT_CANNOT_BE_NULL); - return Mono.fromRunnable(() -> deadLetters.put(registeredGroup, failDeliveredEvent)) - .subscribeWith(MonoProcessor.create()) - .then(); + synchronized (deadLetters) { + return Mono.fromRunnable(() -> deadLetters.put(registeredGroup, failDeliveredEvent)) + .subscribeWith(MonoProcessor.create()) + .then(); + } } @Override @@ -51,12 +54,14 @@ public class MemoryEventDeadLetters implements EventDeadLetters { Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL); Preconditions.checkArgument(failDeliveredEventId != null, FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL); - return Flux.fromIterable(deadLetters.get(registeredGroup)) - .filter(event -> event.getEventId().equals(failDeliveredEventId)) - .next() - .doOnNext(event -> deadLetters.remove(registeredGroup, event)) - .subscribeWith(MonoProcessor.create()) - .then(); + synchronized (deadLetters) { + return Flux.fromIterable(ImmutableList.copyOf(deadLetters.get(registeredGroup))) + .filter(event -> event.getEventId().equals(failDeliveredEventId)) + .next() + .doOnNext(event -> deadLetters.remove(registeredGroup, event)) + .subscribeWith(MonoProcessor.create()) + .then(); + } } @Override @@ -64,21 +69,27 @@ public class MemoryEventDeadLetters implements EventDeadLetters { Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL); Preconditions.checkArgument(failDeliveredEventId != null, FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL); - return Flux.fromIterable(deadLetters.get(registeredGroup)) - .filter(event -> event.getEventId().equals(failDeliveredEventId)) - .next(); + synchronized (deadLetters) { + return Flux.fromIterable(ImmutableList.copyOf(deadLetters.get(registeredGroup))) + .filter(event -> event.getEventId().equals(failDeliveredEventId)) + .next(); + } } @Override public Flux<Event.EventId> failedEventIds(Group registeredGroup) { Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL); - return Flux.fromIterable(deadLetters.get(registeredGroup)) - .map(Event::getEventId); + synchronized (deadLetters) { + return Flux.fromIterable(ImmutableList.copyOf(deadLetters.get(registeredGroup))) + .map(Event::getEventId); + } } @Override public Flux<Group> groupsWithFailedEvents() { - return Flux.fromIterable(deadLetters.keySet()); + synchronized (deadLetters) { + return Flux.fromIterable(ImmutableSet.copyOf(deadLetters.keySet())); + } } } diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java index 7e8c456..ea5553c 100644 --- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java +++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java @@ -21,6 +21,7 @@ package org.apache.james.webadmin.service; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import javax.inject.Inject; @@ -61,12 +62,12 @@ public class EventDeadLettersRedeliverTask implements Task { private final EventBus eventBus; private final EventDeadLetters deadLetters; - private final Flux<Tuple2<Group, Event>> groupsWithEvents; + private final Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents; private final AtomicLong successfulRedeliveriesCount; private final AtomicLong failedRedeliveriesCount; @Inject - EventDeadLettersRedeliverTask(EventBus eventBus, EventDeadLetters deadLetters, Flux<Tuple2<Group, Event>> groupsWithEvents) { + EventDeadLettersRedeliverTask(EventBus eventBus, EventDeadLetters deadLetters, Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents) { this.eventBus = eventBus; this.deadLetters = deadLetters; this.groupsWithEvents = groupsWithEvents; @@ -76,7 +77,7 @@ public class EventDeadLettersRedeliverTask implements Task { @Override public Result run() { - return groupsWithEvents.flatMap(entry -> redeliverGroupEvent(entry.getT1(), entry.getT2())) + return groupsWithEvents.get().flatMap(entry -> redeliverGroupEvent(entry.getT1(), entry.getT2())) .reduce(Result.COMPLETED, Task::combine) .onErrorResume(e -> { LOGGER.error("Error while redelivering events", e); diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java index ce2c915..e9ea848 100644 --- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java +++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java @@ -21,6 +21,7 @@ package org.apache.james.webadmin.service; import java.util.List; import java.util.UUID; +import java.util.function.Supplier; import javax.inject.Inject; @@ -97,22 +98,22 @@ public class EventDeadLettersService { } public Task createActionOnEventsTask() { - Flux<Tuple2<Group, Event>> groupsWithEvents = listGroups().flatMap(this::getGroupWithEvents); + Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents = () -> listGroups().flatMap(this::getGroupWithEvents); return redeliverEvents(groupsWithEvents); } public Task createActionOnEventsTask(Group group) { - return redeliverEvents(getGroupWithEvents(group)); + return redeliverEvents(() -> getGroupWithEvents(group)); } public Task createActionOnEventsTask(Group group, Event.EventId eventId) { - Flux<Tuple2<Group, Event>> groupWithEvent = Flux.just(group).zipWith(getEvent(group, eventId)); + Supplier<Flux<Tuple2<Group, Event>>> groupWithEvent = () -> Flux.just(group).zipWith(getEvent(group, eventId)); return redeliverEvents(groupWithEvent); } - private Task redeliverEvents(Flux<Tuple2<Group, Event>> groupsWithEvents) { + private Task redeliverEvents(Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents) { return new EventDeadLettersRedeliverTask(eventBus, deadLetters, groupsWithEvents); } } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org