This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 4b0b9fc973f0887523b770091e60713b072aabcc Author: Rene Cordier <rcord...@linagora.com> AuthorDate: Wed Feb 27 15:12:48 2019 +0700 MAILBOX-382 Implement new routes on webadmin for redelivery, new service and new task --- .../apache/james/webadmin/dto/ActionEvents.java | 36 ++ .../webadmin/routes/EventDeadLettersRoutes.java | 140 +++++- .../service/EventDeadLettersRedeliverTask.java | 118 +++++ .../webadmin/service/EventDeadLettersService.java | 64 ++- .../james/webadmin/dto/ActionEventsTest.java | 48 ++ .../routes/EventDeadLettersRoutesTest.java | 507 ++++++++++++++++++++- 6 files changed, 880 insertions(+), 33 deletions(-) diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/dto/ActionEvents.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/dto/ActionEvents.java new file mode 100644 index 0000000..f18ad7d --- /dev/null +++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/dto/ActionEvents.java @@ -0,0 +1,36 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.webadmin.dto; + +import java.util.Arrays; + +import com.google.common.base.Preconditions; + +public enum ActionEvents { + reDeliver; + + public static ActionEvents parse(String action) { + Preconditions.checkArgument(action != null, "'action' url parameter is mandatory"); + return Arrays.stream(ActionEvents.values()) + .filter(element -> element.toString().equalsIgnoreCase(action)) + .findAny() + .orElseThrow(() -> new IllegalArgumentException("'" + action + "' is not a valid action query parameter")); + } +} diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/routes/EventDeadLettersRoutes.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/routes/EventDeadLettersRoutes.java index 9c01615..a838625 100644 --- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/routes/EventDeadLettersRoutes.java +++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/routes/EventDeadLettersRoutes.java @@ -24,25 +24,30 @@ import java.util.List; import javax.inject.Inject; import javax.ws.rs.DELETE; import javax.ws.rs.GET; +import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.Produces; import org.apache.james.mailbox.events.Event; import org.apache.james.mailbox.events.Group; +import org.apache.james.task.Task; +import org.apache.james.task.TaskId; +import org.apache.james.task.TaskManager; import org.apache.james.webadmin.Routes; +import org.apache.james.webadmin.dto.ActionEvents; +import org.apache.james.webadmin.dto.TaskIdDto; import org.apache.james.webadmin.service.EventDeadLettersService; import org.apache.james.webadmin.utils.ErrorResponder; import org.apache.james.webadmin.utils.JsonTransformer; import org.eclipse.jetty.http.HttpStatus; -import com.github.steveash.guavate.Guavate; - import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; +import io.swagger.annotations.ResponseHeader; import spark.Request; import spark.Response; import spark.Service; @@ -58,11 +63,13 @@ public class EventDeadLettersRoutes implements Routes { private static final String INTERNAL_SERVER_ERROR = "Internal server error - Something went bad on the server side."; private final EventDeadLettersService eventDeadLettersService; + private final TaskManager taskManager; private final JsonTransformer jsonTransformer; @Inject - EventDeadLettersRoutes(EventDeadLettersService eventDeadLettersService, JsonTransformer jsonTransformer) { + EventDeadLettersRoutes(EventDeadLettersService eventDeadLettersService, TaskManager taskManager, JsonTransformer jsonTransformer) { this.eventDeadLettersService = eventDeadLettersService; + this.taskManager = taskManager; this.jsonTransformer = jsonTransformer; } @@ -73,10 +80,42 @@ public class EventDeadLettersRoutes implements Routes { @Override public void define(Service service) { + service.post(BASE_PATH, this::performActionOnAllEvents, jsonTransformer); service.get(BASE_PATH + "/groups", this::listGroups, jsonTransformer); service.get(BASE_PATH + "/groups/" + GROUP_PARAM, this::listFailedEvents, jsonTransformer); + service.post(BASE_PATH + "/groups/" + GROUP_PARAM, this::performActionOnGroupEvents, jsonTransformer); service.get(BASE_PATH + "/groups/" + GROUP_PARAM + "/" + EVENT_ID_PARAM, this::getEventDetails); service.delete(BASE_PATH + "/groups/" + GROUP_PARAM + "/" + EVENT_ID_PARAM, this::deleteEvent); + service.post(BASE_PATH + "/groups/" + GROUP_PARAM + "/" + EVENT_ID_PARAM, this::performActionOnSingleEvent, jsonTransformer); + } + + @POST + @Path("") + @ApiOperation(value = "Performing operations on all events") + @ApiImplicitParams({ + @ApiImplicitParam( + required = true, + dataType = "String", + name = "action", + paramType = "query", + example = "?action=reDeliver", + value = "Specify the action to perform on all events. For now only 'reDeliver' is supported as an action, " + + "and its purpose is to attempt a redelivery of all events present in dead letter."), + }) + @ApiResponses(value = { + @ApiResponse(code = HttpStatus.CREATED_201, message = "The taskId of the given scheduled task", response = TaskIdDto.class, + responseHeaders = { + @ResponseHeader(name = "Location", description = "URL of the resource associated with the scheduled task") + }), + @ApiResponse(code = HttpStatus.BAD_REQUEST_400, message = "Invalid action argument"), + @ApiResponse(code = HttpStatus.INTERNAL_SERVER_ERROR_500, message = INTERNAL_SERVER_ERROR) + }) + public TaskIdDto performActionOnAllEvents(Request request, Response response) { + assertValidActionParameter(request); + + Task task = eventDeadLettersService.createActionOnEventsTask(); + TaskId taskId = taskManager.submit(task); + return TaskIdDto.respond(response, taskId); } @GET @@ -87,7 +126,7 @@ public class EventDeadLettersRoutes implements Routes { @ApiResponse(code = HttpStatus.INTERNAL_SERVER_ERROR_500, message = INTERNAL_SERVER_ERROR) }) private Iterable<String> listGroups(Request request, Response response) { - return eventDeadLettersService.listGroups(); + return eventDeadLettersService.listGroupsAsStrings(); } @GET @@ -109,7 +148,44 @@ public class EventDeadLettersRoutes implements Routes { }) private Iterable<String> listFailedEvents(Request request, Response response) { Group group = parseGroup(request); - return eventDeadLettersService.listGroupEvents(group); + return eventDeadLettersService.listGroupsEventIdsAsStrings(group); + } + + @POST + @Path("/groups/" + GROUP_PARAM) + @ApiOperation(value = "Performing operations on events of a particular group") + @ApiImplicitParams({ + @ApiImplicitParam( + required = true, + name = "group", + paramType = "path parameter", + dataType = "String", + defaultValue = "none", + value = "Compulsory. Needs to be a valid group name"), + @ApiImplicitParam( + required = true, + dataType = "String", + name = "action", + paramType = "query", + example = "?action=reDeliver", + value = "Specify the action to perform on all events of a particular group. For now only 'reDeliver' is supported as an action, " + + "and its purpose is to attempt a redelivery of all events present in dead letter for the specified group."), + }) + @ApiResponses(value = { + @ApiResponse(code = HttpStatus.CREATED_201, message = "The taskId of the given scheduled task", response = TaskIdDto.class, + responseHeaders = { + @ResponseHeader(name = "Location", description = "URL of the resource associated with the scheduled task") + }), + @ApiResponse(code = HttpStatus.BAD_REQUEST_400, message = "Invalid group name or action argument"), + @ApiResponse(code = HttpStatus.INTERNAL_SERVER_ERROR_500, message = INTERNAL_SERVER_ERROR) + }) + public TaskIdDto performActionOnGroupEvents(Request request, Response response) { + Group group = parseGroup(request); + assertValidActionParameter(request); + + Task task = eventDeadLettersService.createActionOnEventsTask(group); + TaskId taskId = taskManager.submit(task); + return TaskIdDto.respond(response, taskId); } @GET @@ -177,6 +253,60 @@ public class EventDeadLettersRoutes implements Routes { return response; } + @POST + @Path("/groups/" + GROUP_PARAM + "/" + EVENT_ID_PARAM) + @ApiOperation(value = "Performing operations on an event") + @ApiImplicitParams({ + @ApiImplicitParam( + required = true, + name = "group", + paramType = "path parameter", + dataType = "String", + defaultValue = "none", + value = "Compulsory. Needs to be a valid group name"), + @ApiImplicitParam( + required = true, + name = "eventId", + paramType = "path parameter", + dataType = "String", + defaultValue = "none", + value = "Compulsory. Needs to be a valid eventId"), + @ApiImplicitParam( + required = true, + dataType = "String", + name = "action", + paramType = "query", + example = "?action=reDeliver", + value = "Specify the action to perform on an unique event. For now only 'reDeliver' is supported as an action, " + + "and its purpose is to attempt a redelivery of the specified event."), + }) + @ApiResponses(value = { + @ApiResponse(code = HttpStatus.CREATED_201, message = "The taskId of the given scheduled task", response = TaskIdDto.class, + responseHeaders = { + @ResponseHeader(name = "Location", description = "URL of the resource associated with the scheduled task") + }), + @ApiResponse(code = HttpStatus.BAD_REQUEST_400, message = "Invalid group name, event id or action argument"), + @ApiResponse(code = HttpStatus.NOT_FOUND_404, message = "No event with this eventId"), + @ApiResponse(code = HttpStatus.INTERNAL_SERVER_ERROR_500, message = INTERNAL_SERVER_ERROR) + }) + public TaskIdDto performActionOnSingleEvent(Request request, Response response) { + Group group = parseGroup(request); + Event.EventId eventId = parseEventId(request); + assertValidActionParameter(request); + + Task task = eventDeadLettersService.createActionOnEventsTask(group, eventId); + TaskId taskId = taskManager.submit(task); + return TaskIdDto.respond(response, taskId); + } + + private void assertValidActionParameter(Request request) { + ActionEvents action = ActionEvents.parse(request.queryParams("action")); + + if (action != ActionEvents.reDeliver) { + throw new IllegalArgumentException(action + " is not a supported action"); + } + } + private Group parseGroup(Request request) { String groupAsString = request.params(GROUP_PARAM); try { diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java new file mode 100644 index 0000000..7e8c456 --- /dev/null +++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersRedeliverTask.java @@ -0,0 +1,118 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.webadmin.service; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; + +import javax.inject.Inject; + +import org.apache.james.mailbox.events.Event; +import org.apache.james.mailbox.events.EventBus; +import org.apache.james.mailbox.events.EventDeadLetters; +import org.apache.james.mailbox.events.Group; +import org.apache.james.task.Task; +import org.apache.james.task.TaskExecutionDetails; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; + +public class EventDeadLettersRedeliverTask implements Task { + private static final Logger LOGGER = LoggerFactory.getLogger(EventDeadLettersRedeliverTask.class); + public static final String TYPE = "eventDeadLettersRedeliverTask"; + + public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation { + private final long successfulRedeliveriesCount; + private final long failedRedeliveriesCount; + + AdditionalInformation(long successfulRedeliveriesCount, long failedRedeliveriesCount) { + this.successfulRedeliveriesCount = successfulRedeliveriesCount; + this.failedRedeliveriesCount = failedRedeliveriesCount; + } + + public long getSuccessfulRedeliveriesCount() { + return successfulRedeliveriesCount; + } + + public long getFailedRedeliveriesCount() { + return failedRedeliveriesCount; + } + } + + private final EventBus eventBus; + private final EventDeadLetters deadLetters; + private final Flux<Tuple2<Group, Event>> groupsWithEvents; + private final AtomicLong successfulRedeliveriesCount; + private final AtomicLong failedRedeliveriesCount; + + @Inject + EventDeadLettersRedeliverTask(EventBus eventBus, EventDeadLetters deadLetters, Flux<Tuple2<Group, Event>> groupsWithEvents) { + this.eventBus = eventBus; + this.deadLetters = deadLetters; + this.groupsWithEvents = groupsWithEvents; + this.successfulRedeliveriesCount = new AtomicLong(0L); + this.failedRedeliveriesCount = new AtomicLong(0L); + } + + @Override + public Result run() { + return groupsWithEvents.flatMap(entry -> redeliverGroupEvent(entry.getT1(), entry.getT2())) + .reduce(Result.COMPLETED, Task::combine) + .onErrorResume(e -> { + LOGGER.error("Error while redelivering events", e); + return Mono.just(Result.PARTIAL); + }) + .block(); + } + + private Mono<Result> redeliverGroupEvent(Group group, Event event) { + return eventBus.reDeliver(group, event) + .then(Mono.fromCallable(() -> { + deadLetters.remove(group, event.getEventId()); + successfulRedeliveriesCount.incrementAndGet(); + return Result.COMPLETED; + })) + .onErrorResume(e -> { + LOGGER.error("Error while performing redelivery of event: {} for group: {}", + event.getEventId().toString(), group.asString(), e); + failedRedeliveriesCount.incrementAndGet(); + return Mono.just(Result.PARTIAL); + }); + } + + @Override + public String type() { + return TYPE; + } + + @Override + public Optional<TaskExecutionDetails.AdditionalInformation> details() { + return Optional.of(createAdditionalInformation()); + } + + AdditionalInformation createAdditionalInformation() { + return new AdditionalInformation( + successfulRedeliveriesCount.get(), + failedRedeliveriesCount.get()); + } +} diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java index 5f9b1ae..ce2c915 100644 --- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java +++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/EventDeadLettersService.java @@ -19,50 +19,100 @@ package org.apache.james.webadmin.service; +import java.util.List; import java.util.UUID; import javax.inject.Inject; import org.apache.james.event.json.EventSerializer; import org.apache.james.mailbox.events.Event; +import org.apache.james.mailbox.events.EventBus; import org.apache.james.mailbox.events.EventDeadLetters; import org.apache.james.mailbox.events.Group; +import org.apache.james.task.Task; import com.github.steveash.guavate.Guavate; -import com.google.common.collect.ImmutableList; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; public class EventDeadLettersService { private final EventDeadLetters deadLetters; + private final EventBus eventBus; private final EventSerializer eventSerializer; @Inject - public EventDeadLettersService(EventDeadLetters deadLetters, EventSerializer eventSerializer) { + public EventDeadLettersService(EventDeadLetters deadLetters, EventBus eventBus, EventSerializer eventSerializer) { this.deadLetters = deadLetters; + this.eventBus = eventBus; this.eventSerializer = eventSerializer; } - public ImmutableList<String> listGroups() { - return deadLetters.groupsWithFailedEvents() + public List<String> listGroupsAsStrings() { + return listGroups() .map(Group::asString) .collect(Guavate.toImmutableList()) .block(); } - public ImmutableList<String> listGroupEvents(Group group) { - return deadLetters.failedEventIds(group) + private Flux<Group> listGroups() { + return deadLetters.groupsWithFailedEvents(); + } + + public List<String> listGroupsEventIdsAsStrings(Group group) { + return listGroupEventIds(group) .map(Event.EventId::getId) .map(UUID::toString) .collect(Guavate.toImmutableList()) .block(); } + private Flux<Event.EventId> listGroupEventIds(Group group) { + return deadLetters.failedEventIds(group); + } + + private Flux<Event> listGroupEvents(Group group) { + return listGroupEventIds(group) + .flatMap(eventId -> getEvent(group, eventId)); + } + public String getSerializedEvent(Group group, Event.EventId eventId) { - return deadLetters.failedEvent(group, eventId) + return getEvent(group, eventId) .map(eventSerializer::toJson) .block(); } + private Mono<Event> getEvent(Group group, Event.EventId eventId) { + return deadLetters.failedEvent(group, eventId); + } + public void deleteEvent(Group group, Event.EventId eventId) { deadLetters.remove(group, eventId).block(); } + + private Flux<Tuple2<Group, Event>> getGroupWithEvents(Group group) { + return listGroupEvents(group) + .flatMap(event -> Flux.zip(Mono.just(group), Mono.just(event))); + } + + public Task createActionOnEventsTask() { + Flux<Tuple2<Group, Event>> groupsWithEvents = listGroups().flatMap(this::getGroupWithEvents); + + return redeliverEvents(groupsWithEvents); + } + + public Task createActionOnEventsTask(Group group) { + return redeliverEvents(getGroupWithEvents(group)); + } + + public Task createActionOnEventsTask(Group group, Event.EventId eventId) { + Flux<Tuple2<Group, Event>> groupWithEvent = Flux.just(group).zipWith(getEvent(group, eventId)); + + return redeliverEvents(groupWithEvent); + } + + private Task redeliverEvents(Flux<Tuple2<Group, Event>> groupsWithEvents) { + return new EventDeadLettersRedeliverTask(eventBus, deadLetters, groupsWithEvents); + } } diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/dto/ActionEventsTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/dto/ActionEventsTest.java new file mode 100644 index 0000000..9980815 --- /dev/null +++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/dto/ActionEventsTest.java @@ -0,0 +1,48 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.webadmin.dto; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import org.junit.jupiter.api.Test; + +class ActionEventsTest { + private static final String ACTION = "reDeliver"; + + @Test + void parseShouldSucceedWithCorrectActionEventsArgument() { + assertThat(ActionEvents.parse(ACTION)).isEqualTo(ActionEvents.reDeliver); + } + + @Test + void parseShouldFailWithIncorrectActionEventsArgument() { + assertThatThrownBy(() -> ActionEvents.parse("incorrect-action")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("'incorrect-action' is not a valid action query parameter"); + } + + @Test + void parseShouldFailWithMissingActionEventsArgument() { + assertThatThrownBy(() -> ActionEvents.parse(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("'action' url parameter is mandatory"); + } +} diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java index e4e8a31..c099607 100644 --- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java +++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/EventDeadLettersRoutesTest.java @@ -19,30 +19,41 @@ package org.apache.james.webadmin.routes; +import static io.restassured.RestAssured.given; import static io.restassured.RestAssured.when; import static io.restassured.RestAssured.with; import static net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson; import static org.apache.james.webadmin.WebAdminServer.NO_CONFIGURATION; -import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; import org.apache.james.core.User; import org.apache.james.event.json.EventSerializer; import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.events.Event; +import org.apache.james.mailbox.events.EventBus; import org.apache.james.mailbox.events.EventBusTestFixture; import org.apache.james.mailbox.events.EventDeadLetters; +import org.apache.james.mailbox.events.Group; +import org.apache.james.mailbox.events.InVMEventBus; import org.apache.james.mailbox.events.MailboxListener; import org.apache.james.mailbox.events.MemoryEventDeadLetters; +import org.apache.james.mailbox.events.RetryBackoffConfiguration; +import org.apache.james.mailbox.events.delivery.InVmEventDelivery; import org.apache.james.mailbox.inmemory.InMemoryId; import org.apache.james.mailbox.inmemory.InMemoryMessageId; import org.apache.james.mailbox.model.MailboxPath; import org.apache.james.mailbox.store.event.EventFactory; +import org.apache.james.mailbox.util.EventCollector; +import org.apache.james.metrics.api.NoopMetricFactory; import org.apache.james.metrics.logger.DefaultMetricFactory; +import org.apache.james.task.MemoryTaskManager; import org.apache.james.webadmin.WebAdminServer; import org.apache.james.webadmin.WebAdminUtils; +import org.apache.james.webadmin.service.EventDeadLettersRedeliverTask; +import org.apache.james.webadmin.service.EventDeadLettersService; import org.apache.james.webadmin.utils.ErrorResponder; import org.apache.james.webadmin.utils.JsonTransformer; import org.eclipse.jetty.http.HttpStatus; @@ -55,6 +66,7 @@ import io.restassured.RestAssured; import io.restassured.http.ContentType; class EventDeadLettersRoutesTest { + private static final String EVENTS_ACTION = "reDeliver"; private static final String BOB = "b...@apache.org"; private static final String UUID_1 = "6e0dd59d-660e-4d9b-b22f-0354479f47b4"; private static final String UUID_2 = "6e0dd59d-660e-4d9b-b22f-0354479f47b5"; @@ -85,23 +97,27 @@ class EventDeadLettersRoutesTest { " \"sessionId\":452" + " }" + "}"; - public static final String SERIALIZED_GROUP_A = new EventBusTestFixture.GroupA().asString(); + private static final String SERIALIZED_GROUP_A = new EventBusTestFixture.GroupA().asString(); + private static final String SERIALIZED_GROUP_B = new EventBusTestFixture.GroupB().asString(); private WebAdminServer webAdminServer; private EventDeadLetters deadLetters; + private EventBus eventBus; + private MemoryTaskManager taskManager; @BeforeEach void beforeEach() throws Exception { deadLetters = new MemoryEventDeadLetters(); JsonTransformer jsonTransformer = new JsonTransformer(); EventSerializer eventSerializer = new EventSerializer(new InMemoryId.Factory(), new InMemoryMessageId.Factory()); + eventBus = new InVMEventBus(new InVmEventDelivery(new NoopMetricFactory()), RetryBackoffConfiguration.DEFAULT, deadLetters); + EventDeadLettersService service = new EventDeadLettersService(deadLetters, eventBus, eventSerializer); + taskManager = new MemoryTaskManager(); webAdminServer = WebAdminUtils.createWebAdminServer( new DefaultMetricFactory(), - new EventDeadLettersRoutes( - deadLetters, - jsonTransformer, - eventSerializer)); + new EventDeadLettersRoutes(service, taskManager, jsonTransformer), + new TasksRoutes(taskManager, jsonTransformer)); webAdminServer.configure(NO_CONFIGURATION); webAdminServer.await(); @@ -112,6 +128,7 @@ class EventDeadLettersRoutesTest { @AfterEach void tearDown() { webAdminServer.destroy(); + taskManager.stop(); } @Nested @@ -170,7 +187,7 @@ class EventDeadLettersRoutesTest { @Test void listEventsShouldFailWhenInvalidGroup() { when() - .get("/events/deadLetter/groups/invalid/events") + .get("/events/deadLetter/groups/invalid") .then() .statusCode(HttpStatus.BAD_REQUEST_400) .contentType(ContentType.JSON) @@ -182,7 +199,7 @@ class EventDeadLettersRoutesTest { @Test void listEventsShouldReturnEmptyWhenNone() { when() - .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/events") + .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_A) .then() .statusCode(HttpStatus.OK_200) .contentType(ContentType.JSON) @@ -194,7 +211,7 @@ class EventDeadLettersRoutesTest { deadLetters.store(new EventBusTestFixture.GroupA(), EVENT_1).block(); when() - .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/events") + .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_A) .then() .statusCode(HttpStatus.OK_200) .contentType(ContentType.JSON) @@ -207,7 +224,7 @@ class EventDeadLettersRoutesTest { deadLetters.store(new EventBusTestFixture.GroupB(), EVENT_2).block(); when() - .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/events") + .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_A) .then() .statusCode(HttpStatus.OK_200) .contentType(ContentType.JSON) @@ -220,7 +237,7 @@ class EventDeadLettersRoutesTest { deadLetters.store(new EventBusTestFixture.GroupA(), EVENT_2).block(); when() - .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/events") + .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_A) .then() .statusCode(HttpStatus.OK_200) .contentType(ContentType.JSON) @@ -235,7 +252,7 @@ class EventDeadLettersRoutesTest { deadLetters.store(new EventBusTestFixture.GroupA(), EVENT_1).block(); String response = when() - .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/events/" + UUID_1) + .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/" + UUID_1) .then() .statusCode(HttpStatus.OK_200) .contentType(ContentType.JSON) @@ -248,7 +265,7 @@ class EventDeadLettersRoutesTest { @Test void getEventShouldReturn404WhenNotFound() { when() - .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/events/" + UUID_1) + .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/" + UUID_1) .then() .statusCode(HttpStatus.NOT_FOUND_404); } @@ -256,7 +273,7 @@ class EventDeadLettersRoutesTest { @Test void getEventShouldFailWhenInvalidEventId() { when() - .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/events/invalid") + .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/invalid") .then() .statusCode(HttpStatus.BAD_REQUEST_400) .contentType(ContentType.JSON) @@ -268,7 +285,7 @@ class EventDeadLettersRoutesTest { @Test void getEventShouldFailWhenInvalidGroup() { when() - .get("/events/deadLetter/groups/invalid/events/" + UUID_1) + .get("/events/deadLetter/groups/invalid/" + UUID_1) .then() .statusCode(HttpStatus.BAD_REQUEST_400) .contentType(ContentType.JSON) @@ -285,7 +302,7 @@ class EventDeadLettersRoutesTest { deadLetters.store(new EventBusTestFixture.GroupA(), EVENT_1).block(); when() - .delete("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/events/" + UUID_1) + .delete("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/" + UUID_1) .then() .statusCode(HttpStatus.NO_CONTENT_204); } @@ -293,7 +310,7 @@ class EventDeadLettersRoutesTest { @Test void deleteShouldReturnOkWhenEventNotFound() { when() - .delete("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/events/" + UUID_1) + .delete("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/" + UUID_1) .then() .statusCode(HttpStatus.NO_CONTENT_204); } @@ -301,7 +318,7 @@ class EventDeadLettersRoutesTest { @Test void deleteShouldFailWhenInvalidGroup() { when() - .delete("/events/deadLetter/groups/invalid/events/" + UUID_1) + .delete("/events/deadLetter/groups/invalid/" + UUID_1) .then() .statusCode(HttpStatus.BAD_REQUEST_400) .contentType(ContentType.JSON) @@ -313,7 +330,7 @@ class EventDeadLettersRoutesTest { @Test void deleteShouldFailWhenInvalidEventId() { when() - .delete("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/events/invalid") + .delete("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/invalid") .then() .statusCode(HttpStatus.BAD_REQUEST_400) .contentType(ContentType.JSON) @@ -327,12 +344,460 @@ class EventDeadLettersRoutesTest { deadLetters.store(new EventBusTestFixture.GroupA(), EVENT_1).block(); with() - .delete("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/events/" + UUID_1); + .delete("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/" + UUID_1); when() - .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/events/" + UUID_1) + .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/" + UUID_1) .then() .statusCode(HttpStatus.NOT_FOUND_404); } } + + @Nested + class RedeliverAllEvents { + private Group groupA; + private Group groupB; + + @BeforeEach + void nestedBeforeEach() { + EventCollector eventCollectorA = new EventCollector(); + EventCollector eventCollectorB = new EventCollector(); + groupA = new EventBusTestFixture.GroupA(); + groupB = new EventBusTestFixture.GroupB(); + eventBus.register(eventCollectorA, groupA); + eventBus.register(eventCollectorB, groupB); + } + + @Test + void postRedeliverAllEventsShouldCreateATask() { + deadLetters.store(groupA, EVENT_1).block(); + + given() + .queryParam("action", EVENTS_ACTION) + .when() + .post("/events/deadLetter") + .then() + .statusCode(HttpStatus.CREATED_201) + .header("Location", is(notNullValue())) + .body("taskId", is(notNullValue())); + } + + @Test + void postRedeliverAllEventsShouldHaveSuccessfulCompletedTask() { + deadLetters.store(groupA, EVENT_1).block(); + + String taskId = with() + .queryParam("action", EVENTS_ACTION) + .post("/events/deadLetter") + .jsonPath() + .get("taskId"); + + given() + .basePath(TasksRoutes.BASE) + .when() + .get(taskId + "/await") + .then() + .body("status", is("completed")) + .body("taskId", is(taskId)) + .body("additionalInformation.successfulRedeliveriesCount", is(1)) + .body("additionalInformation.failedRedeliveriesCount", is(0)) + .body("type", is(EventDeadLettersRedeliverTask.TYPE)) + .body("startedDate", is(notNullValue())) + .body("submitDate", is(notNullValue())) + .body("completedDate", is(notNullValue())); + } + + @Test + void postRedeliverAllEventsShouldRedeliverAndRemoveEventFromDeadLetters() { + deadLetters.store(groupA, EVENT_1).block(); + + String taskId = with() + .queryParam("action", EVENTS_ACTION) + .post("/events/deadLetter") + .jsonPath() + .get("taskId"); + + given() + .basePath(TasksRoutes.BASE) + .when() + .get(taskId + "/await") + .then() + .body("status", is("completed")) + .body("additionalInformation.successfulRedeliveriesCount", is(1)) + .body("additionalInformation.failedRedeliveriesCount", is(0)); + + when() + .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/" + UUID_1) + .then() + .statusCode(HttpStatus.NOT_FOUND_404); + } + + @Test + void postRedeliverAllEventsShouldRedeliverAndRemoveAllEventsFromDeadLetters() { + deadLetters.store(groupA, EVENT_1).block(); + deadLetters.store(groupA, EVENT_2).block(); + deadLetters.store(groupB, EVENT_2).block(); + + String taskId = with() + .queryParam("action", EVENTS_ACTION) + .post("/events/deadLetter") + .jsonPath() + .get("taskId"); + + given() + .basePath(TasksRoutes.BASE) + .when() + .get(taskId + "/await") + .then() + .body("status", is("completed")) + .body("additionalInformation.successfulRedeliveriesCount", is(3)) + .body("additionalInformation.failedRedeliveriesCount", is(0)); + + when() + .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/" + UUID_1) + .then() + .statusCode(HttpStatus.NOT_FOUND_404); + + when() + .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/" + UUID_2) + .then() + .statusCode(HttpStatus.NOT_FOUND_404); + + when() + .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_B + "/" + UUID_2) + .then() + .statusCode(HttpStatus.NOT_FOUND_404); + } + + @Test + void postRedeliverAllEventsShouldFailWhenInvalidAction() { + deadLetters.store(groupA, EVENT_1).block(); + + given() + .queryParam("action", "invalid-action") + .when() + .post("/events/deadLetter") + .then() + .statusCode(HttpStatus.BAD_REQUEST_400) + .contentType(ContentType.JSON) + .body("statusCode", is(400)) + .body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType())) + .body("message", is("Invalid arguments supplied in the user request")) + .body("details", is("'invalid-action' is not a valid action query parameter")); + } + + @Test + void postRedeliverAllEventsShouldFailWhenMissingAction() { + deadLetters.store(new EventBusTestFixture.GroupA(), EVENT_1).block(); + + when() + .post("/events/deadLetter") + .then() + .statusCode(HttpStatus.BAD_REQUEST_400) + .contentType(ContentType.JSON) + .body("statusCode", is(400)) + .body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType())) + .body("message", is("Invalid arguments supplied in the user request")) + .body("details", is("'action' url parameter is mandatory")); + } + } + + @Nested + class RedeliverGroupEvents { + private Group groupA; + + @BeforeEach + void nestedBeforeEach() { + EventCollector eventCollector = new EventCollector(); + groupA = new EventBusTestFixture.GroupA(); + eventBus.register(eventCollector, groupA); + } + + @Test + void postRedeliverGroupEventsShouldCreateATask() { + deadLetters.store(groupA, EVENT_1).block(); + + given() + .queryParam("action", EVENTS_ACTION) + .when() + .post("/events/deadLetter/groups/" + SERIALIZED_GROUP_A) + .then() + .statusCode(HttpStatus.CREATED_201) + .header("Location", is(notNullValue())) + .body("taskId", is(notNullValue())); + } + + @Test + void postRedeliverGroupEventsShouldHaveSuccessfulCompletedTask() { + deadLetters.store(groupA, EVENT_1).block(); + + String taskId = with() + .queryParam("action", EVENTS_ACTION) + .post("/events/deadLetter/groups/" + SERIALIZED_GROUP_A) + .jsonPath() + .get("taskId"); + + given() + .basePath(TasksRoutes.BASE) + .when() + .get(taskId + "/await") + .then() + .body("status", is("completed")) + .body("taskId", is(taskId)) + .body("additionalInformation.successfulRedeliveriesCount", is(1)) + .body("additionalInformation.failedRedeliveriesCount", is(0)) + .body("type", is(EventDeadLettersRedeliverTask.TYPE)) + .body("startedDate", is(notNullValue())) + .body("submitDate", is(notNullValue())) + .body("completedDate", is(notNullValue())); + } + + @Test + void postRedeliverGroupEventsShouldRedeliverAndRemoveEventFromDeadLetters() { + deadLetters.store(groupA, EVENT_1).block(); + + String taskId = with() + .queryParam("action", EVENTS_ACTION) + .post("/events/deadLetter/groups/" + SERIALIZED_GROUP_A) + .jsonPath() + .get("taskId"); + + given() + .basePath(TasksRoutes.BASE) + .when() + .get(taskId + "/await") + .then() + .body("status", is("completed")) + .body("additionalInformation.successfulRedeliveriesCount", is(1)) + .body("additionalInformation.failedRedeliveriesCount", is(0)); + + when() + .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/" + UUID_1) + .then() + .statusCode(HttpStatus.NOT_FOUND_404); + } + + @Test + void postRedeliverGroupEventsShouldRedeliverAndRemoveAllGroupEventsFromDeadLetters() { + deadLetters.store(groupA, EVENT_1).block(); + deadLetters.store(groupA, EVENT_2).block(); + + String taskId = with() + .queryParam("action", EVENTS_ACTION) + .post("/events/deadLetter/groups/" + SERIALIZED_GROUP_A) + .jsonPath() + .get("taskId"); + + given() + .basePath(TasksRoutes.BASE) + .when() + .get(taskId + "/await") + .then() + .body("status", is("completed")) + .body("additionalInformation.successfulRedeliveriesCount", is(2)) + .body("additionalInformation.failedRedeliveriesCount", is(0)); + + when() + .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/" + UUID_1) + .then() + .statusCode(HttpStatus.NOT_FOUND_404); + + when() + .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/" + UUID_2) + .then() + .statusCode(HttpStatus.NOT_FOUND_404); + } + + @Test + void postRedeliverGroupEventsShouldFailWhenInvalidAction() { + deadLetters.store(groupA, EVENT_1).block(); + + given() + .queryParam("action", "invalid-action") + .when() + .post("/events/deadLetter/groups/" + SERIALIZED_GROUP_A) + .then() + .statusCode(HttpStatus.BAD_REQUEST_400) + .contentType(ContentType.JSON) + .body("statusCode", is(400)) + .body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType())) + .body("message", is("Invalid arguments supplied in the user request")) + .body("details", is("'invalid-action' is not a valid action query parameter")); + } + + @Test + void postRedeliverGroupEventsShouldFailWhenMissingAction() { + deadLetters.store(new EventBusTestFixture.GroupA(), EVENT_1).block(); + + when() + .post("/events/deadLetter/groups/" + SERIALIZED_GROUP_A) + .then() + .statusCode(HttpStatus.BAD_REQUEST_400) + .contentType(ContentType.JSON) + .body("statusCode", is(400)) + .body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType())) + .body("message", is("Invalid arguments supplied in the user request")) + .body("details", is("'action' url parameter is mandatory")); + } + + @Test + void postRedeliverGroupEventsShouldFailWhenInvalidGroup() { + given() + .queryParam("action", EVENTS_ACTION) + .when() + .post("/events/deadLetter/groups/invalid") + .then() + .statusCode(HttpStatus.BAD_REQUEST_400) + .contentType(ContentType.JSON) + .body("statusCode", is(400)) + .body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType())) + .body("message", is("Can not deserialize the supplied group: invalid")); + } + } + + @Nested + class RedeliverSingleEvent { + private Group groupA; + + @BeforeEach + void nestedBeforeEach() { + EventCollector eventCollector = new EventCollector(); + groupA = new EventBusTestFixture.GroupA(); + eventBus.register(eventCollector, groupA); + } + + @Test + void postRedeliverSingleEventShouldCreateATask() { + deadLetters.store(groupA, EVENT_1).block(); + + given() + .queryParam("action", EVENTS_ACTION) + .when() + .post("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/" + UUID_1) + .then() + .statusCode(HttpStatus.CREATED_201) + .header("Location", is(notNullValue())) + .body("taskId", is(notNullValue())); + } + + @Test + void postRedeliverSingleEventShouldHaveSuccessfulCompletedTask() { + deadLetters.store(groupA, EVENT_1).block(); + + String taskId = with() + .queryParam("action", EVENTS_ACTION) + .post("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/" + UUID_1) + .jsonPath() + .get("taskId"); + + given() + .basePath(TasksRoutes.BASE) + .when() + .get(taskId + "/await") + .then() + .body("status", is("completed")) + .body("taskId", is(taskId)) + .body("additionalInformation.successfulRedeliveriesCount", is(1)) + .body("additionalInformation.failedRedeliveriesCount", is(0)) + .body("type", is(EventDeadLettersRedeliverTask.TYPE)) + .body("startedDate", is(notNullValue())) + .body("submitDate", is(notNullValue())) + .body("completedDate", is(notNullValue())); + } + + @Test + void postRedeliverSingleEventShouldRedeliverAndRemoveEventFromDeadLetters() { + deadLetters.store(groupA, EVENT_1).block(); + + String taskId = with() + .queryParam("action", EVENTS_ACTION) + .post("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/" + UUID_1) + .jsonPath() + .get("taskId"); + + given() + .basePath(TasksRoutes.BASE) + .when() + .get(taskId + "/await") + .then() + .body("status", is("completed")) + .body("additionalInformation.successfulRedeliveriesCount", is(1)) + .body("additionalInformation.failedRedeliveriesCount", is(0)); + + when() + .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/" + UUID_1) + .then() + .statusCode(HttpStatus.NOT_FOUND_404); + } + + @Test + void postRedeliverSingleEventShouldReturn404WhenEventNotFound() { + given() + .queryParam("action", EVENTS_ACTION) + .when() + .get("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/" + UUID_1) + .then() + .statusCode(HttpStatus.NOT_FOUND_404); + } + + @Test + void postRedeliverSingleEventShouldFailWhenInvalidAction() { + deadLetters.store(groupA, EVENT_1).block(); + + given() + .queryParam("action", "invalid-action") + .when() + .post("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/" + UUID_1) + .then() + .statusCode(HttpStatus.BAD_REQUEST_400) + .contentType(ContentType.JSON) + .body("statusCode", is(400)) + .body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType())) + .body("message", is("Invalid arguments supplied in the user request")) + .body("details", is("'invalid-action' is not a valid action query parameter")); + } + + @Test + void postRedeliverSingleEventShouldFailWhenMissingAction() { + deadLetters.store(new EventBusTestFixture.GroupA(), EVENT_1).block(); + + when() + .post("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/" + UUID_1) + .then() + .statusCode(HttpStatus.BAD_REQUEST_400) + .contentType(ContentType.JSON) + .body("statusCode", is(400)) + .body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType())) + .body("message", is("Invalid arguments supplied in the user request")) + .body("details", is("'action' url parameter is mandatory")); + } + + @Test + void postRedeliverSingleEventShouldFailWhenInvalidEventId() { + given() + .queryParam("action", EVENTS_ACTION) + .when() + .post("/events/deadLetter/groups/" + SERIALIZED_GROUP_A + "/invalid") + .then() + .statusCode(HttpStatus.BAD_REQUEST_400) + .contentType(ContentType.JSON) + .body("statusCode", is(400)) + .body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType())) + .body("message", is("Can not deserialize the supplied eventId: invalid")); + } + + @Test + void postRedeliverSingleEventShouldFailWhenInvalidGroup() { + given() + .queryParam("action", EVENTS_ACTION) + .when() + .post("/events/deadLetter/groups/invalid/" + UUID_1) + .then() + .statusCode(HttpStatus.BAD_REQUEST_400) + .contentType(ContentType.JSON) + .body("statusCode", is(400)) + .body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType())) + .body("message", is("Can not deserialize the supplied group: invalid")); + } + } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org