This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 31e46437ff4b36852baed9479e7c45fabdd7591f
Author: Rene Cordier <rcord...@linagora.com>
AuthorDate: Mon Mar 4 12:00:05 2019 +0700

    MAILBOX-382 refactor event redelivery task
---
 .../service/EventDeadLettersRedeliverService.java  |  19 ++--
 .../service/EventDeadLettersRedeliverTask.java     |  54 +++++----
 .../webadmin/service/EventDeadLettersService.java  |  39 +------
 .../james/webadmin/service/EventRetriever.java     | 122 +++++++++++++++++++++
 .../routes/EventDeadLettersRoutesTest.java         |  28 +++--
 5 files changed, 188 insertions(+), 74 deletions(-)

diff --git 
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverService.java
 
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverService.java
index c62f55b..4ec62c9 100644
--- 
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverService.java
+++ 
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverService.java
@@ -19,24 +19,18 @@
 
 package org.apache.james.webadmin.service;
 
-import java.util.function.Supplier;
-
 import org.apache.james.mailbox.events.Event;
 import org.apache.james.mailbox.events.EventBus;
 import org.apache.james.mailbox.events.EventDeadLetters;
 import org.apache.james.mailbox.events.Group;
+import org.apache.james.task.Task;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.util.function.Tuple2;
 
 public class EventDeadLettersRedeliverService {
-    enum RedeliverResult {
-        REDELIVER_SUCCESS,
-        REDELIVER_FAIL
-    }
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(EventDeadLettersRedeliverService.class);
 
@@ -48,20 +42,21 @@ public class EventDeadLettersRedeliverService {
         this.deadLetters = deadLetters;
     }
 
-    Flux<RedeliverResult> redeliverEvents(Supplier<Flux<Tuple2<Group, Event>>> 
groupsWithEvents) {
-        return groupsWithEvents.get().flatMap(entry -> 
redeliverGroupEvents(entry.getT1(), entry.getT2()));
+    Flux<Task.Result> redeliverEvents(EventRetriever eventRetriever) {
+        return eventRetriever.retrieveEvents(deadLetters)
+            .flatMap(entry -> redeliverGroupEvents(entry.getT1(), 
entry.getT2()));
     }
 
-    private Mono<RedeliverResult> redeliverGroupEvents(Group group, Event 
event) {
+    private Mono<Task.Result> redeliverGroupEvents(Group group, Event event) {
         return eventBus.reDeliver(group, event)
             .then(Mono.fromCallable(() -> {
                 deadLetters.remove(group, event.getEventId());
-                return RedeliverResult.REDELIVER_SUCCESS;
+                return Task.Result.COMPLETED;
             }))
             .onErrorResume(e -> {
                 LOGGER.error("Error while performing redelivery of event: {} 
for group: {}",
                     event.getEventId().toString(), group.asString(), e);
-                return Mono.just(RedeliverResult.REDELIVER_FAIL);
+                return Mono.just(Task.Result.PARTIAL);
             });
     }
 }
diff --git 
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java
 
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java
index 86fa2b3..b863fbf 100644
--- 
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java
+++ 
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java
@@ -19,33 +19,31 @@
 
 package org.apache.james.webadmin.service;
 
-import static 
org.apache.james.webadmin.service.EventDeadLettersRedeliverService.RedeliverResult;
-
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Supplier;
 
 import org.apache.james.mailbox.events.Event;
 import org.apache.james.mailbox.events.Group;
 import org.apache.james.task.Task;
 import org.apache.james.task.TaskExecutionDetails;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import reactor.core.publisher.Flux;
-import reactor.util.function.Tuple2;
+import com.fasterxml.jackson.annotation.JsonInclude;
 
 public class EventDeadLettersRedeliverTask implements Task {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(EventDeadLettersRedeliverTask.class);
     public static final String TYPE = "eventDeadLettersRedeliverTask";
 
     public static class AdditionalInformation implements 
TaskExecutionDetails.AdditionalInformation {
         private final long successfulRedeliveriesCount;
         private final long failedRedeliveriesCount;
+        private final Optional<Group> group;
+        private final Optional<Event.EventId> eventId;
 
-        AdditionalInformation(long successfulRedeliveriesCount, long 
failedRedeliveriesCount) {
+        AdditionalInformation(long successfulRedeliveriesCount, long 
failedRedeliveriesCount,
+                              Optional<Group> group, Optional<Event.EventId> 
eventId) {
             this.successfulRedeliveriesCount = successfulRedeliveriesCount;
             this.failedRedeliveriesCount = failedRedeliveriesCount;
+            this.group = group;
+            this.eventId = eventId;
         }
 
         public long getSuccessfulRedeliveriesCount() {
@@ -55,38 +53,50 @@ public class EventDeadLettersRedeliverTask implements Task {
         public long getFailedRedeliveriesCount() {
             return failedRedeliveriesCount;
         }
+
+        @JsonInclude(JsonInclude.Include.NON_ABSENT)
+        public Optional<String> getGroup() {
+            return group.map(Group::asString);
+        }
+
+        @JsonInclude(JsonInclude.Include.NON_ABSENT)
+        public Optional<String> getEventId() {
+            return eventId.map(eventId -> eventId.getId().toString());
+        }
     }
 
     private final EventDeadLettersRedeliverService service;
-    private final Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents;
+    private final EventRetriever eventRetriever;
     private final AtomicLong successfulRedeliveriesCount;
     private final AtomicLong failedRedeliveriesCount;
 
-    EventDeadLettersRedeliverTask(EventDeadLettersRedeliverService service, 
Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents) {
+    EventDeadLettersRedeliverTask(EventDeadLettersRedeliverService service, 
EventRetriever eventRetriever) {
         this.service = service;
-        this.groupsWithEvents = groupsWithEvents;
+        this.eventRetriever = eventRetriever;
         this.successfulRedeliveriesCount = new AtomicLong(0L);
         this.failedRedeliveriesCount = new AtomicLong(0L);
     }
 
     @Override
     public Result run() {
-        return service.redeliverEvents(groupsWithEvents)
+        return service.redeliverEvents(eventRetriever)
             .map(this::updateCounters)
             .reduce(Result.COMPLETED, Task::combine)
             .block();
     }
 
-    private Result updateCounters(RedeliverResult redeliverResult) {
-        switch (redeliverResult) {
-            case REDELIVER_SUCCESS:
+    private Result updateCounters(Result result) {
+        switch (result) {
+            case COMPLETED:
                 successfulRedeliveriesCount.incrementAndGet();
-                return Result.COMPLETED;
-            case REDELIVER_FAIL:
-            default:
+                break;
+            case PARTIAL:
                 failedRedeliveriesCount.incrementAndGet();
-                return Result.PARTIAL;
+                break;
+            default:
+                throw new RuntimeException("Result case: " + result.toString() 
+ " not recognized");
         }
+        return result;
     }
 
     @Override
@@ -102,6 +112,8 @@ public class EventDeadLettersRedeliverTask implements Task {
     AdditionalInformation createAdditionalInformation() {
         return new AdditionalInformation(
             successfulRedeliveriesCount.get(),
-            failedRedeliveriesCount.get());
+            failedRedeliveriesCount.get(),
+            eventRetriever.forGroup(),
+            eventRetriever.forEvent());
     }
 }
diff --git 
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java
 
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java
index a0a6c3c..27bd042 100644
--- 
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java
+++ 
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java
@@ -21,7 +21,6 @@ package org.apache.james.webadmin.service;
 
 import java.util.List;
 import java.util.UUID;
-import java.util.function.Supplier;
 
 import javax.inject.Inject;
 
@@ -32,9 +31,7 @@ import org.apache.james.task.Task;
 
 import com.github.steveash.guavate.Guavate;
 
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.util.function.Tuple2;
 
 public class EventDeadLettersService {
     private final EventDeadLettersRedeliverService redeliverService;
@@ -47,33 +44,20 @@ public class EventDeadLettersService {
     }
 
     public List<String> listGroupsAsStrings() {
-        return listGroups()
+        return deadLetters.groupsWithFailedEvents()
             .map(Group::asString)
             .collect(Guavate.toImmutableList())
             .block();
     }
 
-    private Flux<Group> listGroups() {
-        return deadLetters.groupsWithFailedEvents();
-    }
-
     public List<String> listGroupsEventIdsAsStrings(Group group) {
-        return listGroupEventIds(group)
+        return deadLetters.failedEventIds(group)
             .map(Event.EventId::getId)
             .map(UUID::toString)
             .collect(Guavate.toImmutableList())
             .block();
     }
 
-    private Flux<Event.EventId> listGroupEventIds(Group group) {
-        return deadLetters.failedEventIds(group);
-    }
-
-    private Flux<Event> listGroupEvents(Group group) {
-        return listGroupEventIds(group)
-            .flatMap(eventId -> getEvent(group, eventId));
-    }
-
     public Mono<Event> getEvent(Group group, Event.EventId eventId) {
         return deadLetters.failedEvent(group, eventId);
     }
@@ -82,28 +66,15 @@ public class EventDeadLettersService {
         deadLetters.remove(group, eventId).block();
     }
 
-    private Flux<Tuple2<Group, Event>> getGroupWithEvents(Group group) {
-        return listGroupEvents(group)
-            .flatMap(event -> Flux.zip(Mono.just(group), Mono.just(event)));
-    }
-
     public Task redeliverAllEvents() {
-        Supplier<Flux<Tuple2<Group, Event>>> groupsWithEvents = () -> 
listGroups().flatMap(this::getGroupWithEvents);
-
-        return createRedeliverEventsTask(groupsWithEvents);
+        return new EventDeadLettersRedeliverTask(redeliverService, 
EventRetriever.allEvents());
     }
 
     public Task redeliverGroupEvents(Group group) {
-        return createRedeliverEventsTask(() -> getGroupWithEvents(group));
+        return new EventDeadLettersRedeliverTask(redeliverService, 
EventRetriever.groupEvents(group));
     }
 
     public Task redeliverSingleEvent(Group group, Event.EventId eventId) {
-        Supplier<Flux<Tuple2<Group, Event>>> groupWithEvent = () -> 
Flux.just(group).zipWith(getEvent(group, eventId));
-
-        return createRedeliverEventsTask(groupWithEvent);
-    }
-
-    private Task createRedeliverEventsTask(Supplier<Flux<Tuple2<Group, 
Event>>> groupsWithEvents) {
-        return new EventDeadLettersRedeliverTask(redeliverService, 
groupsWithEvents);
+        return new EventDeadLettersRedeliverTask(redeliverService, 
EventRetriever.singleEvent(group, eventId));
     }
 }
diff --git 
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventRetriever.java
 
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventRetriever.java
new file mode 100644
index 0000000..a780e8f
--- /dev/null
+++ 
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventRetriever.java
@@ -0,0 +1,122 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.service;
+
+import java.util.Optional;
+
+import org.apache.james.mailbox.events.Event;
+import org.apache.james.mailbox.events.EventDeadLetters;
+import org.apache.james.mailbox.events.Group;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
+
+public interface EventRetriever {
+    static EventRetriever allEvents() {
+        return new AllEventsRetriever();
+    }
+
+    static EventRetriever groupEvents(Group group) {
+        return new GroupEventsRetriever(group);
+    }
+
+    static EventRetriever singleEvent(Group group, Event.EventId eventId) {
+        return new SingleEventRetriever(group, eventId);
+    }
+
+    Optional<Group> forGroup();
+
+    Optional<Event.EventId> forEvent();
+
+    Flux<Tuple2<Group, Event>> retrieveEvents(EventDeadLetters deadLetters);
+
+    default Flux<Tuple2<Group, Event>> listGroupEvents(EventDeadLetters 
deadLetters, Group group) {
+        return deadLetters.failedEventIds(group)
+            .flatMap(eventId -> deadLetters.failedEvent(group, eventId))
+            .flatMap(event -> Flux.zip(Mono.just(group), Mono.just(event)));
+    }
+
+    class AllEventsRetriever implements EventRetriever {
+        @Override
+        public Optional<Group> forGroup() {
+            return Optional.empty();
+        }
+
+        @Override
+        public Optional<Event.EventId> forEvent() {
+            return Optional.empty();
+        }
+
+        @Override
+        public Flux<Tuple2<Group, Event>> retrieveEvents(EventDeadLetters 
deadLetters) {
+            return deadLetters.groupsWithFailedEvents()
+                .flatMap(group -> listGroupEvents(deadLetters, group));
+        }
+    }
+
+    class GroupEventsRetriever implements EventRetriever {
+        private final Group group;
+
+        GroupEventsRetriever(Group group) {
+            this.group = group;
+        }
+
+        @Override
+        public Optional<Group> forGroup() {
+            return Optional.of(group);
+        }
+
+        @Override
+        public Optional<Event.EventId> forEvent() {
+            return Optional.empty();
+        }
+
+        @Override
+        public Flux<Tuple2<Group, Event>> retrieveEvents(EventDeadLetters 
deadLetters) {
+            return listGroupEvents(deadLetters, group);
+        }
+    }
+
+    class SingleEventRetriever implements EventRetriever {
+        private final Group group;
+        private final Event.EventId eventId;
+
+        SingleEventRetriever(Group group, Event.EventId eventId) {
+            this.group = group;
+            this.eventId = eventId;
+        }
+
+        @Override
+        public Optional<Group> forGroup() {
+            return Optional.of(group);
+        }
+
+        @Override
+        public Optional<Event.EventId> forEvent() {
+            return Optional.of(eventId);
+        }
+
+        @Override
+        public Flux<Tuple2<Group, Event>> retrieveEvents(EventDeadLetters 
deadLetters) {
+            return Flux.just(group).zipWith(deadLetters.failedEvent(group, 
eventId));
+        }
+    }
+}
diff --git 
a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java
 
b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java
index 0cbf9b5..f6c3a60 100644
--- 
a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java
+++ 
b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java
@@ -29,6 +29,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 
 import org.apache.james.core.User;
 import org.apache.james.event.json.EventSerializer;
@@ -100,7 +101,6 @@ class EventDeadLettersRoutesTest {
         "  }" +
         "}";
     private static final String SERIALIZED_GROUP_A = new 
EventBusTestFixture.GroupA().asString();
-    private static final String SERIALIZED_GROUP_B = new 
EventBusTestFixture.GroupB().asString();
 
     private WebAdminServer webAdminServer;
     private EventDeadLetters deadLetters;
@@ -404,6 +404,8 @@ class EventDeadLettersRoutesTest {
                 .body("taskId", is(taskId))
                 .body("additionalInformation.successfulRedeliveriesCount", 
is(1))
                 .body("additionalInformation.failedRedeliveriesCount", is(0))
+                .body("additionalInformation.group", is(nullValue()))
+                .body("additionalInformation.eventId", is(nullValue()))
                 .body("type", is(EventDeadLettersRedeliverTask.TYPE))
                 .body("startedDate", is(notNullValue()))
                 .body("submitDate", is(notNullValue()))
@@ -589,6 +591,8 @@ class EventDeadLettersRoutesTest {
                 .body("taskId", is(taskId))
                 .body("additionalInformation.successfulRedeliveriesCount", 
is(1))
                 .body("additionalInformation.failedRedeliveriesCount", is(0))
+                .body("additionalInformation.group", is(SERIALIZED_GROUP_A))
+                .body("additionalInformation.eventId", is(nullValue()))
                 .body("type", is(EventDeadLettersRedeliverTask.TYPE))
                 .body("startedDate", is(notNullValue()))
                 .body("submitDate", is(notNullValue()))
@@ -612,7 +616,8 @@ class EventDeadLettersRoutesTest {
             .then()
                 .body("status", is("completed"))
                 .body("additionalInformation.successfulRedeliveriesCount", 
is(1))
-                .body("additionalInformation.failedRedeliveriesCount", is(0));
+                .body("additionalInformation.failedRedeliveriesCount", is(0))
+                .body("additionalInformation.group", is(SERIALIZED_GROUP_A));
 
             when()
                 .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/" + 
UUID_1)
@@ -637,7 +642,8 @@ class EventDeadLettersRoutesTest {
             .then()
                 .body("status", is("completed"))
                 .body("additionalInformation.successfulRedeliveriesCount", 
is(1))
-                .body("additionalInformation.failedRedeliveriesCount", is(0));
+                .body("additionalInformation.failedRedeliveriesCount", is(0))
+                .body("additionalInformation.group", is(SERIALIZED_GROUP_A));
 
             assertThat(eventCollector.getEvents()).hasSize(1);
         }
@@ -660,7 +666,8 @@ class EventDeadLettersRoutesTest {
             .then()
                 .body("status", is("completed"))
                 .body("additionalInformation.successfulRedeliveriesCount", 
is(2))
-                .body("additionalInformation.failedRedeliveriesCount", is(0));
+                .body("additionalInformation.failedRedeliveriesCount", is(0))
+                .body("additionalInformation.group", is(SERIALIZED_GROUP_A));
 
             when()
                 .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_A)
@@ -688,7 +695,8 @@ class EventDeadLettersRoutesTest {
             .then()
                 .body("status", is("completed"))
                 .body("additionalInformation.successfulRedeliveriesCount", 
is(2))
-                .body("additionalInformation.failedRedeliveriesCount", is(0));
+                .body("additionalInformation.failedRedeliveriesCount", is(0))
+                .body("additionalInformation.group", is(SERIALIZED_GROUP_A));
 
             assertThat(eventCollector.getEvents()).hasSize(2);
         }
@@ -785,6 +793,8 @@ class EventDeadLettersRoutesTest {
                 .body("taskId", is(taskId))
                 .body("additionalInformation.successfulRedeliveriesCount", 
is(1))
                 .body("additionalInformation.failedRedeliveriesCount", is(0))
+                .body("additionalInformation.group", is(SERIALIZED_GROUP_A))
+                .body("additionalInformation.eventId", is(UUID_1))
                 .body("type", is(EventDeadLettersRedeliverTask.TYPE))
                 .body("startedDate", is(notNullValue()))
                 .body("submitDate", is(notNullValue()))
@@ -808,7 +818,9 @@ class EventDeadLettersRoutesTest {
             .then()
                 .body("status", is("completed"))
                 .body("additionalInformation.successfulRedeliveriesCount", 
is(1))
-                .body("additionalInformation.failedRedeliveriesCount", is(0));
+                .body("additionalInformation.failedRedeliveriesCount", is(0))
+                .body("additionalInformation.group", is(SERIALIZED_GROUP_A))
+                .body("additionalInformation.eventId", is(UUID_1));
 
             when()
                 .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/" + 
UUID_1)
@@ -833,7 +845,9 @@ class EventDeadLettersRoutesTest {
             .then()
                 .body("status", is("completed"))
                 .body("additionalInformation.successfulRedeliveriesCount", 
is(1))
-                .body("additionalInformation.failedRedeliveriesCount", is(0));
+                .body("additionalInformation.failedRedeliveriesCount", is(0))
+                .body("additionalInformation.group", is(SERIALIZED_GROUP_A))
+                .body("additionalInformation.eventId", is(UUID_1));
 
             assertThat(eventCollector.getEvents()).hasSize(1);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to