This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 31e46437ff4b36852baed9479e7c45fabdd7591f Author: Rene Cordier <rcord...@linagora.com> AuthorDate: Mon Mar 4 12:00:05 2019 +0700 MAILBOX-382 refactor event redelivery task --- .../service/EventDeadLettersRedeliverService.java | 19 ++-- .../service/EventDeadLettersRedeliverTask.java | 54 +++++---- .../webadmin/service/EventDeadLettersService.java | 39 +------ .../james/webadmin/service/EventRetriever.java | 122 +++++++++++++++++++++ .../routes/EventDeadLettersRoutesTest.java | 28 +++-- 5 files changed, 188 insertions(+), 74 deletions(-) diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverService.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverService.java index c62f55b..4ec62c9 100644 --- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverService.java +++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverService.java @@ -19,24 +19,18 @@ package org.apache.james.webadmin.service; -import java.util.function.Supplier; - import org.apache.james.mailbox.events.Event; import org.apache.james.mailbox.events.EventBus; import org.apache.james.mailbox.events.EventDeadLetters; import org.apache.james.mailbox.events.Group; +import org.apache.james.task.Task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.util.function.Tuple2; public class EventDeadLettersRedeliverService { - enum RedeliverResult { - REDELIVER_SUCCESS, - REDELIVER_FAIL - } private static final Logger LOGGER = LoggerFactory.getLogger(EventDeadLettersRedeliverService.class); @@ -48,20 +42,21 @@ public class EventDeadLettersRedeliverService { this.deadLetters = deadLetters; } - Flux<RedeliverResult> redeliverEvents(Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents) { - return groupsWithEvents.get().flatMap(entry -> redeliverGroupEvents(entry.getT1(), entry.getT2())); + Flux<Task.Result> redeliverEvents(EventRetriever eventRetriever) { + return eventRetriever.retrieveEvents(deadLetters) + .flatMap(entry -> redeliverGroupEvents(entry.getT1(), entry.getT2())); } - private Mono<RedeliverResult> redeliverGroupEvents(Group group, Event event) { + private Mono<Task.Result> redeliverGroupEvents(Group group, Event event) { return eventBus.reDeliver(group, event) .then(Mono.fromCallable(() -> { deadLetters.remove(group, event.getEventId()); - return RedeliverResult.REDELIVER_SUCCESS; + return Task.Result.COMPLETED; })) .onErrorResume(e -> { LOGGER.error("Error while performing redelivery of event: {} for group: {}", event.getEventId().toString(), group.asString(), e); - return Mono.just(RedeliverResult.REDELIVER_FAIL); + return Mono.just(Task.Result.PARTIAL); }); } } diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java index 86fa2b3..b863fbf 100644 --- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java +++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java @@ -19,33 +19,31 @@ package org.apache.james.webadmin.service; -import static org.apache.james.webadmin.service.EventDeadLettersRedeliverService.RedeliverResult; - import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Supplier; import org.apache.james.mailbox.events.Event; import org.apache.james.mailbox.events.Group; import org.apache.james.task.Task; import org.apache.james.task.TaskExecutionDetails; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import reactor.core.publisher.Flux; -import reactor.util.function.Tuple2; +import com.fasterxml.jackson.annotation.JsonInclude; public class EventDeadLettersRedeliverTask implements Task { - private static final Logger LOGGER = LoggerFactory.getLogger(EventDeadLettersRedeliverTask.class); public static final String TYPE = "eventDeadLettersRedeliverTask"; public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation { private final long successfulRedeliveriesCount; private final long failedRedeliveriesCount; + private final Optional<Group> group; + private final Optional<Event.EventId> eventId; - AdditionalInformation(long successfulRedeliveriesCount, long failedRedeliveriesCount) { + AdditionalInformation(long successfulRedeliveriesCount, long failedRedeliveriesCount, + Optional<Group> group, Optional<Event.EventId> eventId) { this.successfulRedeliveriesCount = successfulRedeliveriesCount; this.failedRedeliveriesCount = failedRedeliveriesCount; + this.group = group; + this.eventId = eventId; } public long getSuccessfulRedeliveriesCount() { @@ -55,38 +53,50 @@ public class EventDeadLettersRedeliverTask implements Task { public long getFailedRedeliveriesCount() { return failedRedeliveriesCount; } + + @JsonInclude(JsonInclude.Include.NON_ABSENT) + public Optional<String> getGroup() { + return group.map(Group::asString); + } + + @JsonInclude(JsonInclude.Include.NON_ABSENT) + public Optional<String> getEventId() { + return eventId.map(eventId -> eventId.getId().toString()); + } } private final EventDeadLettersRedeliverService service; - private final Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents; + private final EventRetriever eventRetriever; private final AtomicLong successfulRedeliveriesCount; private final AtomicLong failedRedeliveriesCount; - EventDeadLettersRedeliverTask(EventDeadLettersRedeliverService service, Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents) { + EventDeadLettersRedeliverTask(EventDeadLettersRedeliverService service, EventRetriever eventRetriever) { this.service = service; - this.groupsWithEvents = groupsWithEvents; + this.eventRetriever = eventRetriever; this.successfulRedeliveriesCount = new AtomicLong(0L); this.failedRedeliveriesCount = new AtomicLong(0L); } @Override public Result run() { - return service.redeliverEvents(groupsWithEvents) + return service.redeliverEvents(eventRetriever) .map(this::updateCounters) .reduce(Result.COMPLETED, Task::combine) .block(); } - private Result updateCounters(RedeliverResult redeliverResult) { - switch (redeliverResult) { - case REDELIVER_SUCCESS: + private Result updateCounters(Result result) { + switch (result) { + case COMPLETED: successfulRedeliveriesCount.incrementAndGet(); - return Result.COMPLETED; - case REDELIVER_FAIL: - default: + break; + case PARTIAL: failedRedeliveriesCount.incrementAndGet(); - return Result.PARTIAL; + break; + default: + throw new RuntimeException("Result case: " + result.toString() + " not recognized"); } + return result; } @Override @@ -102,6 +112,8 @@ public class EventDeadLettersRedeliverTask implements Task { AdditionalInformation createAdditionalInformation() { return new AdditionalInformation( successfulRedeliveriesCount.get(), - failedRedeliveriesCount.get()); + failedRedeliveriesCount.get(), + eventRetriever.forGroup(), + eventRetriever.forEvent()); } } diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java index a0a6c3c..27bd042 100644 --- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java +++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java @@ -21,7 +21,6 @@ package org.apache.james.webadmin.service; import java.util.List; import java.util.UUID; -import java.util.function.Supplier; import javax.inject.Inject; @@ -32,9 +31,7 @@ import org.apache.james.task.Task; import com.github.steveash.guavate.Guavate; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.util.function.Tuple2; public class EventDeadLettersService { private final EventDeadLettersRedeliverService redeliverService; @@ -47,33 +44,20 @@ public class EventDeadLettersService { } public List<String> listGroupsAsStrings() { - return listGroups() + return deadLetters.groupsWithFailedEvents() .map(Group::asString) .collect(Guavate.toImmutableList()) .block(); } - private Flux<Group> listGroups() { - return deadLetters.groupsWithFailedEvents(); - } - public List<String> listGroupsEventIdsAsStrings(Group group) { - return listGroupEventIds(group) + return deadLetters.failedEventIds(group) .map(Event.EventId::getId) .map(UUID::toString) .collect(Guavate.toImmutableList()) .block(); } - private Flux<Event.EventId> listGroupEventIds(Group group) { - return deadLetters.failedEventIds(group); - } - - private Flux<Event> listGroupEvents(Group group) { - return listGroupEventIds(group) - .flatMap(eventId -> getEvent(group, eventId)); - } - public Mono<Event> getEvent(Group group, Event.EventId eventId) { return deadLetters.failedEvent(group, eventId); } @@ -82,28 +66,15 @@ public class EventDeadLettersService { deadLetters.remove(group, eventId).block(); } - private Flux<Tuple2<Group, Event>> getGroupWithEvents(Group group) { - return listGroupEvents(group) - .flatMap(event -> Flux.zip(Mono.just(group), Mono.just(event))); - } - public Task redeliverAllEvents() { - Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents = () -> listGroups().flatMap(this::getGroupWithEvents); - - return createRedeliverEventsTask(groupsWithEvents); + return new EventDeadLettersRedeliverTask(redeliverService, EventRetriever.allEvents()); } public Task redeliverGroupEvents(Group group) { - return createRedeliverEventsTask(() -> getGroupWithEvents(group)); + return new EventDeadLettersRedeliverTask(redeliverService, EventRetriever.groupEvents(group)); } public Task redeliverSingleEvent(Group group, Event.EventId eventId) { - Supplier<Flux<Tuple2<Group, Event>>> groupWithEvent = () -> Flux.just(group).zipWith(getEvent(group, eventId)); - - return createRedeliverEventsTask(groupWithEvent); - } - - private Task createRedeliverEventsTask(Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents) { - return new EventDeadLettersRedeliverTask(redeliverService, groupsWithEvents); + return new EventDeadLettersRedeliverTask(redeliverService, EventRetriever.singleEvent(group, eventId)); } } diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventRetriever.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventRetriever.java new file mode 100644 index 0000000..a780e8f --- /dev/null +++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventRetriever.java @@ -0,0 +1,122 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.webadmin.service; + +import java.util.Optional; + +import org.apache.james.mailbox.events.Event; +import org.apache.james.mailbox.events.EventDeadLetters; +import org.apache.james.mailbox.events.Group; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; + +public interface EventRetriever { + static EventRetriever allEvents() { + return new AllEventsRetriever(); + } + + static EventRetriever groupEvents(Group group) { + return new GroupEventsRetriever(group); + } + + static EventRetriever singleEvent(Group group, Event.EventId eventId) { + return new SingleEventRetriever(group, eventId); + } + + Optional<Group> forGroup(); + + Optional<Event.EventId> forEvent(); + + Flux<Tuple2<Group, Event>> retrieveEvents(EventDeadLetters deadLetters); + + default Flux<Tuple2<Group, Event>> listGroupEvents(EventDeadLetters deadLetters, Group group) { + return deadLetters.failedEventIds(group) + .flatMap(eventId -> deadLetters.failedEvent(group, eventId)) + .flatMap(event -> Flux.zip(Mono.just(group), Mono.just(event))); + } + + class AllEventsRetriever implements EventRetriever { + @Override + public Optional<Group> forGroup() { + return Optional.empty(); + } + + @Override + public Optional<Event.EventId> forEvent() { + return Optional.empty(); + } + + @Override + public Flux<Tuple2<Group, Event>> retrieveEvents(EventDeadLetters deadLetters) { + return deadLetters.groupsWithFailedEvents() + .flatMap(group -> listGroupEvents(deadLetters, group)); + } + } + + class GroupEventsRetriever implements EventRetriever { + private final Group group; + + GroupEventsRetriever(Group group) { + this.group = group; + } + + @Override + public Optional<Group> forGroup() { + return Optional.of(group); + } + + @Override + public Optional<Event.EventId> forEvent() { + return Optional.empty(); + } + + @Override + public Flux<Tuple2<Group, Event>> retrieveEvents(EventDeadLetters deadLetters) { + return listGroupEvents(deadLetters, group); + } + } + + class SingleEventRetriever implements EventRetriever { + private final Group group; + private final Event.EventId eventId; + + SingleEventRetriever(Group group, Event.EventId eventId) { + this.group = group; + this.eventId = eventId; + } + + @Override + public Optional<Group> forGroup() { + return Optional.of(group); + } + + @Override + public Optional<Event.EventId> forEvent() { + return Optional.of(eventId); + } + + @Override + public Flux<Tuple2<Group, Event>> retrieveEvents(EventDeadLetters deadLetters) { + return Flux.just(group).zipWith(deadLetters.failedEvent(group, eventId)); + } + } +} diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java index 0cbf9b5..f6c3a60 100644 --- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java +++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java @@ -29,6 +29,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import org.apache.james.core.User; import org.apache.james.event.json.EventSerializer; @@ -100,7 +101,6 @@ class EventDeadLettersRoutesTest { " }" + "}"; private static final String SERIALIZED_GROUP_A = new EventBusTestFixture.GroupA().asString(); - private static final String SERIALIZED_GROUP_B = new EventBusTestFixture.GroupB().asString(); private WebAdminServer webAdminServer; private EventDeadLetters deadLetters; @@ -404,6 +404,8 @@ class EventDeadLettersRoutesTest { .body("taskId", is(taskId)) .body("additionalInformation.successfulRedeliveriesCount", is(1)) .body("additionalInformation.failedRedeliveriesCount", is(0)) + .body("additionalInformation.group", is(nullValue())) + .body("additionalInformation.eventId", is(nullValue())) .body("type", is(EventDeadLettersRedeliverTask.TYPE)) .body("startedDate", is(notNullValue())) .body("submitDate", is(notNullValue())) @@ -589,6 +591,8 @@ class EventDeadLettersRoutesTest { .body("taskId", is(taskId)) .body("additionalInformation.successfulRedeliveriesCount", is(1)) .body("additionalInformation.failedRedeliveriesCount", is(0)) + .body("additionalInformation.group", is(SERIALIZED_GROUP_A)) + .body("additionalInformation.eventId", is(nullValue())) .body("type", is(EventDeadLettersRedeliverTask.TYPE)) .body("startedDate", is(notNullValue())) .body("submitDate", is(notNullValue())) @@ -612,7 +616,8 @@ class EventDeadLettersRoutesTest { .then() .body("status", is("completed")) .body("additionalInformation.successfulRedeliveriesCount", is(1)) - .body("additionalInformation.failedRedeliveriesCount", is(0)); + .body("additionalInformation.failedRedeliveriesCount", is(0)) + .body("additionalInformation.group", is(SERIALIZED_GROUP_A)); when() .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/" + UUID_1) @@ -637,7 +642,8 @@ class EventDeadLettersRoutesTest { .then() .body("status", is("completed")) .body("additionalInformation.successfulRedeliveriesCount", is(1)) - .body("additionalInformation.failedRedeliveriesCount", is(0)); + .body("additionalInformation.failedRedeliveriesCount", is(0)) + .body("additionalInformation.group", is(SERIALIZED_GROUP_A)); assertThat(eventCollector.getEvents()).hasSize(1); } @@ -660,7 +666,8 @@ class EventDeadLettersRoutesTest { .then() .body("status", is("completed")) .body("additionalInformation.successfulRedeliveriesCount", is(2)) - .body("additionalInformation.failedRedeliveriesCount", is(0)); + .body("additionalInformation.failedRedeliveriesCount", is(0)) + .body("additionalInformation.group", is(SERIALIZED_GROUP_A)); when() .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_A) @@ -688,7 +695,8 @@ class EventDeadLettersRoutesTest { .then() .body("status", is("completed")) .body("additionalInformation.successfulRedeliveriesCount", is(2)) - .body("additionalInformation.failedRedeliveriesCount", is(0)); + .body("additionalInformation.failedRedeliveriesCount", is(0)) + .body("additionalInformation.group", is(SERIALIZED_GROUP_A)); assertThat(eventCollector.getEvents()).hasSize(2); } @@ -785,6 +793,8 @@ class EventDeadLettersRoutesTest { .body("taskId", is(taskId)) .body("additionalInformation.successfulRedeliveriesCount", is(1)) .body("additionalInformation.failedRedeliveriesCount", is(0)) + .body("additionalInformation.group", is(SERIALIZED_GROUP_A)) + .body("additionalInformation.eventId", is(UUID_1)) .body("type", is(EventDeadLettersRedeliverTask.TYPE)) .body("startedDate", is(notNullValue())) .body("submitDate", is(notNullValue())) @@ -808,7 +818,9 @@ class EventDeadLettersRoutesTest { .then() .body("status", is("completed")) .body("additionalInformation.successfulRedeliveriesCount", is(1)) - .body("additionalInformation.failedRedeliveriesCount", is(0)); + .body("additionalInformation.failedRedeliveriesCount", is(0)) + .body("additionalInformation.group", is(SERIALIZED_GROUP_A)) + .body("additionalInformation.eventId", is(UUID_1)); when() .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/" + UUID_1) @@ -833,7 +845,9 @@ class EventDeadLettersRoutesTest { .then() .body("status", is("completed")) .body("additionalInformation.successfulRedeliveriesCount", is(1)) - .body("additionalInformation.failedRedeliveriesCount", is(0)); + .body("additionalInformation.failedRedeliveriesCount", is(0)) + .body("additionalInformation.group", is(SERIALIZED_GROUP_A)) + .body("additionalInformation.eventId", is(UUID_1)); assertThat(eventCollector.getEvents()).hasSize(1); } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org