This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 49018e5f9e8e5a80bce77095db1b027d7a857a90
Author: Rene Cordier <rcord...@linagora.com>
AuthorDate: Fri Mar 1 15:57:42 2019 +0700

    MAILBOX-382 moving business logic away from redelivery task to a new service
---
 .../service/EventDeadLettersRedeliverService.java  | 67 ++++++++++++++++++++++
 .../service/EventDeadLettersRedeliverTask.java     | 37 +++++-------
 .../webadmin/service/EventDeadLettersService.java  |  9 ++-
 .../routes/EventDeadLettersRoutesTest.java         |  4 +-
 4 files changed, 88 insertions(+), 29 deletions(-)

diff --git 
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverService.java
 
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverService.java
new file mode 100644
index 0000000..c62f55b
--- /dev/null
+++ 
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverService.java
@@ -0,0 +1,67 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.service;
+
+import java.util.function.Supplier;
+
+import org.apache.james.mailbox.events.Event;
+import org.apache.james.mailbox.events.EventBus;
+import org.apache.james.mailbox.events.EventDeadLetters;
+import org.apache.james.mailbox.events.Group;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
+
+public class EventDeadLettersRedeliverService {
+    enum RedeliverResult {
+        REDELIVER_SUCCESS,
+        REDELIVER_FAIL
+    }
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(EventDeadLettersRedeliverService.class);
+
+    private final EventBus eventBus;
+    private final EventDeadLetters deadLetters;
+
+    public EventDeadLettersRedeliverService(EventBus eventBus, 
EventDeadLetters deadLetters) {
+        this.eventBus = eventBus;
+        this.deadLetters = deadLetters;
+    }
+
+    Flux<RedeliverResult> redeliverEvents(Supplier<Flux<Tuple2<Group, Event>>> 
groupsWithEvents) {
+        return groupsWithEvents.get().flatMap(entry -> 
redeliverGroupEvents(entry.getT1(), entry.getT2()));
+    }
+
+    private Mono<RedeliverResult> redeliverGroupEvents(Group group, Event 
event) {
+        return eventBus.reDeliver(group, event)
+            .then(Mono.fromCallable(() -> {
+                deadLetters.remove(group, event.getEventId());
+                return RedeliverResult.REDELIVER_SUCCESS;
+            }))
+            .onErrorResume(e -> {
+                LOGGER.error("Error while performing redelivery of event: {} 
for group: {}",
+                    event.getEventId().toString(), group.asString(), e);
+                return Mono.just(RedeliverResult.REDELIVER_FAIL);
+            });
+    }
+}
diff --git 
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java
 
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java
index 6d38495..86fa2b3 100644
--- 
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java
+++ 
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java
@@ -19,13 +19,13 @@
 
 package org.apache.james.webadmin.service;
 
+import static 
org.apache.james.webadmin.service.EventDeadLettersRedeliverService.RedeliverResult;
+
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 
 import org.apache.james.mailbox.events.Event;
-import org.apache.james.mailbox.events.EventBus;
-import org.apache.james.mailbox.events.EventDeadLetters;
 import org.apache.james.mailbox.events.Group;
 import org.apache.james.task.Task;
 import org.apache.james.task.TaskExecutionDetails;
@@ -33,7 +33,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
 import reactor.util.function.Tuple2;
 
 public class EventDeadLettersRedeliverTask implements Task {
@@ -58,15 +57,13 @@ public class EventDeadLettersRedeliverTask implements Task {
         }
     }
 
-    private final EventBus eventBus;
-    private final EventDeadLetters deadLetters;
+    private final EventDeadLettersRedeliverService service;
     private final Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents;
     private final AtomicLong successfulRedeliveriesCount;
     private final AtomicLong failedRedeliveriesCount;
 
-    EventDeadLettersRedeliverTask(EventBus eventBus, EventDeadLetters 
deadLetters, Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents) {
-        this.eventBus = eventBus;
-        this.deadLetters = deadLetters;
+    EventDeadLettersRedeliverTask(EventDeadLettersRedeliverService service, 
Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents) {
+        this.service = service;
         this.groupsWithEvents = groupsWithEvents;
         this.successfulRedeliveriesCount = new AtomicLong(0L);
         this.failedRedeliveriesCount = new AtomicLong(0L);
@@ -74,28 +71,22 @@ public class EventDeadLettersRedeliverTask implements Task {
 
     @Override
     public Result run() {
-        return groupsWithEvents.get().flatMap(entry -> 
redeliverGroupEvent(entry.getT1(), entry.getT2()))
+        return service.redeliverEvents(groupsWithEvents)
+            .map(this::updateCounters)
             .reduce(Result.COMPLETED, Task::combine)
-            .onErrorResume(e -> {
-                LOGGER.error("Error while redelivering events", e);
-                return Mono.just(Result.PARTIAL);
-            })
             .block();
     }
 
-    private Mono<Result> redeliverGroupEvent(Group group, Event event) {
-        return eventBus.reDeliver(group, event)
-            .then(Mono.fromCallable(() -> {
-                deadLetters.remove(group, event.getEventId());
+    private Result updateCounters(RedeliverResult redeliverResult) {
+        switch (redeliverResult) {
+            case REDELIVER_SUCCESS:
                 successfulRedeliveriesCount.incrementAndGet();
                 return Result.COMPLETED;
-            }))
-            .onErrorResume(e -> {
-                LOGGER.error("Error while performing redelivery of event: {} 
for group: {}",
-                    event.getEventId().toString(), group.asString(), e);
+            case REDELIVER_FAIL:
+            default:
                 failedRedeliveriesCount.incrementAndGet();
-                return Mono.just(Result.PARTIAL);
-            });
+                return Result.PARTIAL;
+        }
     }
 
     @Override
diff --git 
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java
 
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java
index 2ea4649..a0a6c3c 100644
--- 
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java
+++ 
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java
@@ -26,7 +26,6 @@ import java.util.function.Supplier;
 import javax.inject.Inject;
 
 import org.apache.james.mailbox.events.Event;
-import org.apache.james.mailbox.events.EventBus;
 import org.apache.james.mailbox.events.EventDeadLetters;
 import org.apache.james.mailbox.events.Group;
 import org.apache.james.task.Task;
@@ -38,13 +37,13 @@ import reactor.core.publisher.Mono;
 import reactor.util.function.Tuple2;
 
 public class EventDeadLettersService {
+    private final EventDeadLettersRedeliverService redeliverService;
     private final EventDeadLetters deadLetters;
-    private final EventBus eventBus;
 
     @Inject
-    public EventDeadLettersService(EventDeadLetters deadLetters, EventBus 
eventBus) {
+    public EventDeadLettersService(EventDeadLettersRedeliverService 
redeliverService, EventDeadLetters deadLetters) {
+        this.redeliverService = redeliverService;
         this.deadLetters = deadLetters;
-        this.eventBus = eventBus;
     }
 
     public List<String> listGroupsAsStrings() {
@@ -105,6 +104,6 @@ public class EventDeadLettersService {
     }
 
     private Task createRedeliverEventsTask(Supplier<Flux<Tuple2<Group, 
Event>>> groupsWithEvents) {
-        return new EventDeadLettersRedeliverTask(eventBus, deadLetters, 
groupsWithEvents);
+        return new EventDeadLettersRedeliverTask(redeliverService, 
groupsWithEvents);
     }
 }
diff --git 
a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java
 
b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java
index 9a55014..0ea37ce 100644
--- 
a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java
+++ 
b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java
@@ -52,6 +52,7 @@ import org.apache.james.metrics.logger.DefaultMetricFactory;
 import org.apache.james.task.MemoryTaskManager;
 import org.apache.james.webadmin.WebAdminServer;
 import org.apache.james.webadmin.WebAdminUtils;
+import org.apache.james.webadmin.service.EventDeadLettersRedeliverService;
 import org.apache.james.webadmin.service.EventDeadLettersRedeliverTask;
 import org.apache.james.webadmin.service.EventDeadLettersService;
 import org.apache.james.webadmin.utils.ErrorResponder;
@@ -111,7 +112,8 @@ class EventDeadLettersRoutesTest {
         JsonTransformer jsonTransformer = new JsonTransformer();
         EventSerializer eventSerializer = new EventSerializer(new 
InMemoryId.Factory(), new InMemoryMessageId.Factory());
         eventBus = new InVMEventBus(new InVmEventDelivery(new 
NoopMetricFactory()), RetryBackoffConfiguration.DEFAULT, deadLetters);
-        EventDeadLettersService service = new 
EventDeadLettersService(deadLetters, eventBus);
+        EventDeadLettersRedeliverService redeliverService = new 
EventDeadLettersRedeliverService(eventBus, deadLetters);
+        EventDeadLettersService service = new 
EventDeadLettersService(redeliverService, deadLetters);
 
         taskManager = new MemoryTaskManager();
         webAdminServer = WebAdminUtils.createWebAdminServer(


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to