This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 49018e5f9e8e5a80bce77095db1b027d7a857a90 Author: Rene Cordier <rcord...@linagora.com> AuthorDate: Fri Mar 1 15:57:42 2019 +0700 MAILBOX-382 moving business logic away from redelivery task to a new service --- .../service/EventDeadLettersRedeliverService.java | 67 ++++++++++++++++++++++ .../service/EventDeadLettersRedeliverTask.java | 37 +++++------- .../webadmin/service/EventDeadLettersService.java | 9 ++- .../routes/EventDeadLettersRoutesTest.java | 4 +- 4 files changed, 88 insertions(+), 29 deletions(-) diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverService.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverService.java new file mode 100644 index 0000000..c62f55b --- /dev/null +++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverService.java @@ -0,0 +1,67 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.webadmin.service; + +import java.util.function.Supplier; + +import org.apache.james.mailbox.events.Event; +import org.apache.james.mailbox.events.EventBus; +import org.apache.james.mailbox.events.EventDeadLetters; +import org.apache.james.mailbox.events.Group; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; + +public class EventDeadLettersRedeliverService { + enum RedeliverResult { + REDELIVER_SUCCESS, + REDELIVER_FAIL + } + + private static final Logger LOGGER = LoggerFactory.getLogger(EventDeadLettersRedeliverService.class); + + private final EventBus eventBus; + private final EventDeadLetters deadLetters; + + public EventDeadLettersRedeliverService(EventBus eventBus, EventDeadLetters deadLetters) { + this.eventBus = eventBus; + this.deadLetters = deadLetters; + } + + Flux<RedeliverResult> redeliverEvents(Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents) { + return groupsWithEvents.get().flatMap(entry -> redeliverGroupEvents(entry.getT1(), entry.getT2())); + } + + private Mono<RedeliverResult> redeliverGroupEvents(Group group, Event event) { + return eventBus.reDeliver(group, event) + .then(Mono.fromCallable(() -> { + deadLetters.remove(group, event.getEventId()); + return RedeliverResult.REDELIVER_SUCCESS; + })) + .onErrorResume(e -> { + LOGGER.error("Error while performing redelivery of event: {} for group: {}", + event.getEventId().toString(), group.asString(), e); + return Mono.just(RedeliverResult.REDELIVER_FAIL); + }); + } +} diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java index 6d38495..86fa2b3 100644 --- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java +++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java @@ -19,13 +19,13 @@ package org.apache.james.webadmin.service; +import static org.apache.james.webadmin.service.EventDeadLettersRedeliverService.RedeliverResult; + import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import org.apache.james.mailbox.events.Event; -import org.apache.james.mailbox.events.EventBus; -import org.apache.james.mailbox.events.EventDeadLetters; import org.apache.james.mailbox.events.Group; import org.apache.james.task.Task; import org.apache.james.task.TaskExecutionDetails; @@ -33,7 +33,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; public class EventDeadLettersRedeliverTask implements Task { @@ -58,15 +57,13 @@ public class EventDeadLettersRedeliverTask implements Task { } } - private final EventBus eventBus; - private final EventDeadLetters deadLetters; + private final EventDeadLettersRedeliverService service; private final Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents; private final AtomicLong successfulRedeliveriesCount; private final AtomicLong failedRedeliveriesCount; - EventDeadLettersRedeliverTask(EventBus eventBus, EventDeadLetters deadLetters, Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents) { - this.eventBus = eventBus; - this.deadLetters = deadLetters; + EventDeadLettersRedeliverTask(EventDeadLettersRedeliverService service, Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents) { + this.service = service; this.groupsWithEvents = groupsWithEvents; this.successfulRedeliveriesCount = new AtomicLong(0L); this.failedRedeliveriesCount = new AtomicLong(0L); @@ -74,28 +71,22 @@ public class EventDeadLettersRedeliverTask implements Task { @Override public Result run() { - return groupsWithEvents.get().flatMap(entry -> redeliverGroupEvent(entry.getT1(), entry.getT2())) + return service.redeliverEvents(groupsWithEvents) + .map(this::updateCounters) .reduce(Result.COMPLETED, Task::combine) - .onErrorResume(e -> { - LOGGER.error("Error while redelivering events", e); - return Mono.just(Result.PARTIAL); - }) .block(); } - private Mono<Result> redeliverGroupEvent(Group group, Event event) { - return eventBus.reDeliver(group, event) - .then(Mono.fromCallable(() -> { - deadLetters.remove(group, event.getEventId()); + private Result updateCounters(RedeliverResult redeliverResult) { + switch (redeliverResult) { + case REDELIVER_SUCCESS: successfulRedeliveriesCount.incrementAndGet(); return Result.COMPLETED; - })) - .onErrorResume(e -> { - LOGGER.error("Error while performing redelivery of event: {} for group: {}", - event.getEventId().toString(), group.asString(), e); + case REDELIVER_FAIL: + default: failedRedeliveriesCount.incrementAndGet(); - return Mono.just(Result.PARTIAL); - }); + return Result.PARTIAL; + } } @Override diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java index 2ea4649..a0a6c3c 100644 --- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java +++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java @@ -26,7 +26,6 @@ import java.util.function.Supplier; import javax.inject.Inject; import org.apache.james.mailbox.events.Event; -import org.apache.james.mailbox.events.EventBus; import org.apache.james.mailbox.events.EventDeadLetters; import org.apache.james.mailbox.events.Group; import org.apache.james.task.Task; @@ -38,13 +37,13 @@ import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; public class EventDeadLettersService { + private final EventDeadLettersRedeliverService redeliverService; private final EventDeadLetters deadLetters; - private final EventBus eventBus; @Inject - public EventDeadLettersService(EventDeadLetters deadLetters, EventBus eventBus) { + public EventDeadLettersService(EventDeadLettersRedeliverService redeliverService, EventDeadLetters deadLetters) { + this.redeliverService = redeliverService; this.deadLetters = deadLetters; - this.eventBus = eventBus; } public List<String> listGroupsAsStrings() { @@ -105,6 +104,6 @@ public class EventDeadLettersService { } private Task createRedeliverEventsTask(Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents) { - return new EventDeadLettersRedeliverTask(eventBus, deadLetters, groupsWithEvents); + return new EventDeadLettersRedeliverTask(redeliverService, groupsWithEvents); } } diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java index 9a55014..0ea37ce 100644 --- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java +++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java @@ -52,6 +52,7 @@ import org.apache.james.metrics.logger.DefaultMetricFactory; import org.apache.james.task.MemoryTaskManager; import org.apache.james.webadmin.WebAdminServer; import org.apache.james.webadmin.WebAdminUtils; +import org.apache.james.webadmin.service.EventDeadLettersRedeliverService; import org.apache.james.webadmin.service.EventDeadLettersRedeliverTask; import org.apache.james.webadmin.service.EventDeadLettersService; import org.apache.james.webadmin.utils.ErrorResponder; @@ -111,7 +112,8 @@ class EventDeadLettersRoutesTest { JsonTransformer jsonTransformer = new JsonTransformer(); EventSerializer eventSerializer = new EventSerializer(new InMemoryId.Factory(), new InMemoryMessageId.Factory()); eventBus = new InVMEventBus(new InVmEventDelivery(new NoopMetricFactory()), RetryBackoffConfiguration.DEFAULT, deadLetters); - EventDeadLettersService service = new EventDeadLettersService(deadLetters, eventBus); + EventDeadLettersRedeliverService redeliverService = new EventDeadLettersRedeliverService(eventBus, deadLetters); + EventDeadLettersService service = new EventDeadLettersService(redeliverService, deadLetters); taskManager = new MemoryTaskManager(); webAdminServer = WebAdminUtils.createWebAdminServer( --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org