This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 7bc6a36334b4765d373b7610f26a1f1d044fbe28 Author: Rémi KOWALSKI <rkowal...@linagora.com> AuthorDate: Tue Jul 7 17:32:23 2020 +0200 JAMES-3296 Add task to republish RabbitMQ MailQueue from Cassandra --- pom.xml | 5 + .../guice/cassandra-rabbitmq-guice/pom.xml | 4 + .../james/CassandraRabbitMQJamesServerMain.java | 3 +- server/container/guice/pom.xml | 6 + .../webadmin-rabbitmq-mailqueue}/pom.xml | 25 ++- .../server/RabbitMailQueueRoutesModule.java | 36 +++++ .../RabbitMailQueueTaskSerializationModule.java | 53 +++++++ server/container/guice/rabbitmq/pom.xml | 8 + ...dminServerTaskSerializationIntegrationTest.java | 23 ++- server/protocols/webadmin/pom.xml | 1 + .../protocols/webadmin/webadmin-mailqueue/pom.xml | 2 +- .../pom.xml | 18 ++- .../webadmin/routes/RabbitMQMailQueuesRoutes.java | 174 +++++++++++++++++++++ ...ProcessedMailsTaskAdditionalInformationDTO.java | 90 +++++++++++ .../service/RepublishNotProcessedMailsTaskDTO.java | 85 ++++++++++ .../service/RepublishNotprocessedMailsTask.java | 107 +++++++++++++ .../routes/RabbitMQMailQueuesRoutesTest.java | 144 +++++++++++++++++ .../RepublishNotprocessedMailsTaskTest.java | 108 +++++++++++++ src/site/markdown/server/manage-webadmin.md | 35 +++++ 19 files changed, 903 insertions(+), 24 deletions(-) diff --git a/pom.xml b/pom.xml index f2de229..0e7b225 100644 --- a/pom.xml +++ b/pom.xml @@ -1877,6 +1877,11 @@ </dependency> <dependency> <groupId>${james.groupId}</groupId> + <artifactId>james-server-webadmin-rabbitmq</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> <artifactId>james-server-webadmin-swagger</artifactId> <version>${project.version}</version> </dependency> diff --git a/server/container/guice/cassandra-rabbitmq-guice/pom.xml b/server/container/guice/cassandra-rabbitmq-guice/pom.xml index 1b02418..6edd29c 100644 --- a/server/container/guice/cassandra-rabbitmq-guice/pom.xml +++ b/server/container/guice/cassandra-rabbitmq-guice/pom.xml @@ -181,6 +181,10 @@ </dependency> <dependency> <groupId>${james.groupId}</groupId> + <artifactId>james-server-webadmin-rabbitmq</artifactId> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> <artifactId>testing-base</artifactId> <scope>test</scope> </dependency> diff --git a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java index 2e40639..322acd3 100644 --- a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java +++ b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java @@ -29,6 +29,7 @@ import org.apache.james.modules.blobstore.BlobStoreModulesChooser; import org.apache.james.modules.event.RabbitMQEventBusModule; import org.apache.james.modules.rabbitmq.RabbitMQModule; import org.apache.james.modules.server.JMXServerModule; +import org.apache.james.modules.server.RabbitMailQueueRoutesModule; import com.google.inject.Module; import com.google.inject.util.Modules; @@ -37,7 +38,7 @@ public class CassandraRabbitMQJamesServerMain implements JamesServerMain { protected static final Module MODULES = Modules .override(Modules.combine(REQUIRE_TASK_MANAGER_MODULE, new DistributedTaskManagerModule())) - .with(new RabbitMQModule(), new RabbitMQEventBusModule(), new DistributedTaskSerializationModule()); + .with(new RabbitMQModule(), new RabbitMailQueueRoutesModule(), new RabbitMQEventBusModule(), new DistributedTaskSerializationModule()); public static void main(String[] args) throws Exception { CassandraRabbitMQJamesConfiguration configuration = CassandraRabbitMQJamesConfiguration.builder() diff --git a/server/container/guice/pom.xml b/server/container/guice/pom.xml index bf8fe7f..cbae105 100644 --- a/server/container/guice/pom.xml +++ b/server/container/guice/pom.xml @@ -71,6 +71,7 @@ <module>protocols/webadmin-mailbox</module> <module>protocols/webadmin-mailqueue</module> <module>protocols/webadmin-mailrepository</module> + <module>protocols/webadmin-rabbitmq-mailqueue</module> <module>protocols/webadmin-swagger</module> <module>rabbitmq</module> <module>testing</module> @@ -200,6 +201,11 @@ </dependency> <dependency> <groupId>${james.groupId}</groupId> + <artifactId>james-server-guice-webadmin-rabbitmq-mailqueue</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> <artifactId>james-server-guice-webadmin-mailrepository</artifactId> <version>${project.version}</version> </dependency> diff --git a/server/container/guice/rabbitmq/pom.xml b/server/container/guice/protocols/webadmin-rabbitmq-mailqueue/pom.xml similarity index 73% copy from server/container/guice/rabbitmq/pom.xml copy to server/container/guice/protocols/webadmin-rabbitmq-mailqueue/pom.xml index b117e4f..9d625f4 100644 --- a/server/container/guice/rabbitmq/pom.xml +++ b/server/container/guice/protocols/webadmin-rabbitmq-mailqueue/pom.xml @@ -18,35 +18,29 @@ under the License. --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> - <groupId>org.apache.james</groupId> <artifactId>james-server-guice</artifactId> + <groupId>org.apache.james</groupId> <version>3.6.0-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> + <relativePath>../../pom.xml</relativePath> </parent> - <artifactId>james-server-guice-rabbitmq</artifactId> + <artifactId>james-server-guice-webadmin-rabbitmq-mailqueue</artifactId> - <name>Apache James :: Server :: Guice :: RabbitMQ</name> - <description>Guice Module for RabbitMQ</description> + <name>Apache James :: Server :: Guice :: Webadmin :: RabbitMQ :: MailQueue</name> + <description>Webadmin rabbitMQ mailqueue modules for Guice implementation of James server</description> <dependencies> <dependency> <groupId>${james.groupId}</groupId> - <artifactId>apache-james-backends-rabbitmq</artifactId> - </dependency> - <dependency> - <groupId>${james.groupId}</groupId> - <artifactId>james-server-guice-configuration</artifactId> + <artifactId>james-server-webadmin-core</artifactId> </dependency> <dependency> <groupId>${james.groupId}</groupId> - <artifactId>james-server-queue-api</artifactId> - </dependency> - <dependency> - <groupId>${james.groupId}</groupId> - <artifactId>james-server-queue-rabbitmq</artifactId> + <artifactId>james-server-webadmin-rabbitmq</artifactId> </dependency> <dependency> <groupId>${james.groupId}</groupId> @@ -58,5 +52,4 @@ <artifactId>guice</artifactId> </dependency> </dependencies> - </project> diff --git a/server/container/guice/protocols/webadmin-rabbitmq-mailqueue/src/main/java/org/apache/james/modules/server/RabbitMailQueueRoutesModule.java b/server/container/guice/protocols/webadmin-rabbitmq-mailqueue/src/main/java/org/apache/james/modules/server/RabbitMailQueueRoutesModule.java new file mode 100644 index 0000000..dbffe76 --- /dev/null +++ b/server/container/guice/protocols/webadmin-rabbitmq-mailqueue/src/main/java/org/apache/james/modules/server/RabbitMailQueueRoutesModule.java @@ -0,0 +1,36 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.modules.server; + +import org.apache.james.webadmin.Routes; +import org.apache.james.webadmin.routes.RabbitMQMailQueuesRoutes; + +import com.google.inject.AbstractModule; +import com.google.inject.multibindings.Multibinder; + +public class RabbitMailQueueRoutesModule extends AbstractModule { + @Override + protected void configure() { + install(new RabbitMailQueueTaskSerializationModule()); + + Multibinder<Routes> routesMultibinder = Multibinder.newSetBinder(binder(), Routes.class); + routesMultibinder.addBinding().to(RabbitMQMailQueuesRoutes.class); + } +} diff --git a/server/container/guice/protocols/webadmin-rabbitmq-mailqueue/src/main/java/org/apache/james/modules/server/RabbitMailQueueTaskSerializationModule.java b/server/container/guice/protocols/webadmin-rabbitmq-mailqueue/src/main/java/org/apache/james/modules/server/RabbitMailQueueTaskSerializationModule.java new file mode 100644 index 0000000..61d7048 --- /dev/null +++ b/server/container/guice/protocols/webadmin-rabbitmq-mailqueue/src/main/java/org/apache/james/modules/server/RabbitMailQueueTaskSerializationModule.java @@ -0,0 +1,53 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.modules.server; + +import org.apache.james.queue.api.MailQueueFactory; +import org.apache.james.queue.rabbitmq.RabbitMQMailQueue; +import org.apache.james.server.task.json.dto.AdditionalInformationDTO; +import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule; +import org.apache.james.server.task.json.dto.TaskDTO; +import org.apache.james.server.task.json.dto.TaskDTOModule; +import org.apache.james.task.Task; +import org.apache.james.task.TaskExecutionDetails; +import org.apache.james.webadmin.dto.DTOModuleInjections; +import org.apache.james.webadmin.service.RepublishNotProcessedMailsTaskAdditionalInformationDTO; +import org.apache.james.webadmin.service.RepublishNotProcessedMailsTaskDTO; + +import com.google.inject.AbstractModule; +import com.google.inject.multibindings.ProvidesIntoSet; +import com.google.inject.name.Named; + +public class RabbitMailQueueTaskSerializationModule extends AbstractModule { + @ProvidesIntoSet + public TaskDTOModule<? extends Task, ? extends TaskDTO> republishNotProcessedMailsTask(MailQueueFactory<RabbitMQMailQueue> mailQueueFactory) { + return RepublishNotProcessedMailsTaskDTO.module(mailQueueFactory); + } + + @ProvidesIntoSet + public AdditionalInformationDTOModule<? extends TaskExecutionDetails.AdditionalInformation, ? extends AdditionalInformationDTO> republishNotProcessedMailsAdditionalInformation() { + return RepublishNotProcessedMailsTaskAdditionalInformationDTO.module(); + } + + @Named(DTOModuleInjections.WEBADMIN_DTO) + @ProvidesIntoSet + public AdditionalInformationDTOModule<? extends TaskExecutionDetails.AdditionalInformation, ? extends AdditionalInformationDTO> republishNotProcessedMailsAdditionalInformationWebAdmin() { + return RepublishNotProcessedMailsTaskAdditionalInformationDTO.module(); + } +} diff --git a/server/container/guice/rabbitmq/pom.xml b/server/container/guice/rabbitmq/pom.xml index b117e4f..5c4b0cc 100644 --- a/server/container/guice/rabbitmq/pom.xml +++ b/server/container/guice/rabbitmq/pom.xml @@ -42,6 +42,14 @@ </dependency> <dependency> <groupId>${james.groupId}</groupId> + <artifactId>james-server-guice-webadmin-rabbitmq-mailqueue</artifactId> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>james-server-webadmin-rabbitmq</artifactId> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> <artifactId>james-server-queue-api</artifactId> </dependency> <dependency> diff --git a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java index f0775df..3d5e3ed 100644 --- a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java +++ b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQWebAdminServerTaskSerializationIntegrationTest.java @@ -797,4 +797,25 @@ class RabbitMQWebAdminServerTaskSerializationIntegrationTest { .mailboxPath(MailboxPath.forUser(Username.of(USERNAME), "Important-mailbox")) .build(); } -} \ No newline at end of file + + @Test + void republishNotProcessedMailsOnSpoolShouldComplete() { + String taskId = with() + .basePath("/mailQueues/spool") + .queryParam("action", "RepublishNotProcessedMails") + .queryParam("olderThan", "2d") + .post() + .jsonPath() + .get("taskId"); + + given() + .basePath(TasksRoutes.BASE) + .when() + .get(taskId + "/await") + .then() + .body("status", is("completed")) + .body("taskId", is(taskId)) + .body("type", is("republish-not-processed-mails")) + .body("additionalInformation.nbRequeuedMails", is(0)); + } +} diff --git a/server/protocols/webadmin/pom.xml b/server/protocols/webadmin/pom.xml index 4600e0a..db7f387 100644 --- a/server/protocols/webadmin/pom.xml +++ b/server/protocols/webadmin/pom.xml @@ -42,6 +42,7 @@ <module>webadmin-mailbox-deleted-message-vault</module> <module>webadmin-mailqueue</module> <module>webadmin-mailrepository</module> + <module>webadmin-rabbitmq</module> <module>webadmin-swagger</module> </modules> diff --git a/server/protocols/webadmin/webadmin-mailqueue/pom.xml b/server/protocols/webadmin/webadmin-mailqueue/pom.xml index f119ef2..a35a339 100644 --- a/server/protocols/webadmin/webadmin-mailqueue/pom.xml +++ b/server/protocols/webadmin/webadmin-mailqueue/pom.xml @@ -129,7 +129,7 @@ <version>v1</version> </info> <swaggerDirectory>${project.build.directory}</swaggerDirectory> - <swaggerFileName>webadmin-mailbox</swaggerFileName> + <swaggerFileName>webadmin-mailqueue</swaggerFileName> </apiSource> </apiSources> </configuration> diff --git a/server/protocols/webadmin/webadmin-mailqueue/pom.xml b/server/protocols/webadmin/webadmin-rabbitmq/pom.xml similarity index 88% copy from server/protocols/webadmin/webadmin-mailqueue/pom.xml copy to server/protocols/webadmin/webadmin-rabbitmq/pom.xml index f119ef2..862deff 100644 --- a/server/protocols/webadmin/webadmin-mailqueue/pom.xml +++ b/server/protocols/webadmin/webadmin-rabbitmq/pom.xml @@ -27,10 +27,10 @@ <relativePath>../../../pom.xml</relativePath> </parent> - <artifactId>james-server-webadmin-mailqueue</artifactId> + <artifactId>james-server-webadmin-rabbitmq</artifactId> <packaging>jar</packaging> - <name>Apache James :: Server :: Web Admin :: MailQueue</name> + <name>Apache James :: Server :: Web Admin :: RabbitMQ</name> <dependencies> <dependency> @@ -56,7 +56,11 @@ </dependency> <dependency> <groupId>${james.groupId}</groupId> - <artifactId>james-server-queue-memory</artifactId> + <artifactId>james-server-queue-rabbitmq</artifactId> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>james-server-testing</artifactId> <scope>test</scope> </dependency> <dependency> @@ -65,7 +69,7 @@ </dependency> <dependency> <groupId>${james.groupId}</groupId> - <artifactId>james-server-task-memory</artifactId> + <artifactId>james-server-task-distributed</artifactId> <scope>test</scope> </dependency> <dependency> @@ -80,6 +84,10 @@ </dependency> <dependency> <groupId>${james.groupId}</groupId> + <artifactId>james-server-webadmin-mailqueue</artifactId> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> <artifactId>metrics-tests</artifactId> <scope>test</scope> </dependency> @@ -129,7 +137,7 @@ <version>v1</version> </info> <swaggerDirectory>${project.build.directory}</swaggerDirectory> - <swaggerFileName>webadmin-mailbox</swaggerFileName> + <swaggerFileName>webadmin-rabbitmq</swaggerFileName> </apiSource> </apiSources> </configuration> diff --git a/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/routes/RabbitMQMailQueuesRoutes.java b/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/routes/RabbitMQMailQueuesRoutes.java new file mode 100644 index 0000000..c366df6 --- /dev/null +++ b/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/routes/RabbitMQMailQueuesRoutes.java @@ -0,0 +1,174 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.webadmin.routes; + +import static org.apache.james.webadmin.Constants.SEPARATOR; +import static org.apache.james.webadmin.routes.MailQueueRoutes.BASE_URL; +import static org.apache.james.webadmin.routes.MailQueueRoutes.MAIL_QUEUE_NAME; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.function.Predicate; + +import javax.inject.Inject; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; + +import org.apache.james.queue.api.MailQueueFactory; +import org.apache.james.queue.api.MailQueueName; +import org.apache.james.queue.rabbitmq.RabbitMQMailQueue; +import org.apache.james.task.Task; +import org.apache.james.task.TaskManager; +import org.apache.james.util.DurationParser; +import org.apache.james.webadmin.Routes; +import org.apache.james.webadmin.service.RepublishNotprocessedMailsTask; +import org.apache.james.webadmin.tasks.TaskFromRequest; +import org.apache.james.webadmin.tasks.TaskFromRequestRegistry; +import org.apache.james.webadmin.tasks.TaskRegistrationKey; +import org.apache.james.webadmin.utils.ErrorResponder; +import org.apache.james.webadmin.utils.JsonTransformer; +import org.eclipse.jetty.http.HttpStatus; + +import com.google.common.annotations.VisibleForTesting; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiImplicitParams; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import spark.Request; +import spark.Service; + +@Api(tags = "MailQueues") +@Path(BASE_URL) +@Produces("application/json") +public class RabbitMQMailQueuesRoutes implements Routes { + + private static final TaskRegistrationKey REPUBLISH_NOT_PROCESSED_MAILS_REGISTRATION_KEY = TaskRegistrationKey.of("RepublishNotProcessedMails"); + + private final MailQueueFactory<RabbitMQMailQueue> mailQueueFactory; + private final JsonTransformer jsonTransformer; + private final TaskManager taskManager; + private final Clock clock; + + @Inject + @SuppressWarnings("unchecked") + @VisibleForTesting + RabbitMQMailQueuesRoutes(MailQueueFactory<RabbitMQMailQueue> mailQueueFactory, + Clock clock, JsonTransformer jsonTransformer, TaskManager taskManager) { + this.mailQueueFactory = mailQueueFactory; + this.clock = clock; + this.jsonTransformer = jsonTransformer; + this.taskManager = taskManager; + } + + @Override + public String getBasePath() { + return BASE_URL; + } + + @Override + public void define(Service service) { + republishNotProcessedMails(service); + } + + + @POST + @Path("/{mailQueueName}") + @ApiImplicitParams({ + @ApiImplicitParam(required = true, dataType = "string", name = "mailQueueName", paramType = "path"), + @ApiImplicitParam( + required = true, + dataType = "String", + name = "action", + paramType = "query", + example = "?action=RepublishNotProcessedMails", + value = "Specify the action to perform on a RabbitMQ mail queue."), + @ApiImplicitParam( + required = true, + dataType = "String", + name = "olderThan", + paramType = "query", + example = "?olderThan=1w", + value = "Specify the messages minimum age to republish") + }) + @ApiOperation( + value = "republish the not processed mails of the RabbitMQ MailQueue using the cassandra mail queue view" + ) + @ApiResponses(value = { + @ApiResponse(code = HttpStatus.CREATED_201, message = "OK, the task for rebuilding the queue is created"), + @ApiResponse(code = HttpStatus.BAD_REQUEST_400, message = "Invalid request for rebuilding the mail queue."), + @ApiResponse(code = HttpStatus.INTERNAL_SERVER_ERROR_500, message = "Internal server error - Something went bad on the server side.") + }) + public void republishNotProcessedMails(Service service) { + TaskFromRequest taskFromRequest = this::republishNotProcessedMails; + service.post(BASE_URL + SEPARATOR + MAIL_QUEUE_NAME, + TaskFromRequestRegistry.builder() + .register(REPUBLISH_NOT_PROCESSED_MAILS_REGISTRATION_KEY, this::republishNotProcessedMails) + .buildAsRoute(taskManager), + jsonTransformer); + } + + private Task republishNotProcessedMails(Request request) { + RabbitMQMailQueue mailQueue = getMailQueue(MailQueueName.of(request.params(MAIL_QUEUE_NAME))); + return new RepublishNotprocessedMailsTask(mailQueue, getOlderThan(request)); + } + + + private RabbitMQMailQueue getMailQueue(MailQueueName mailQueueName) { + return mailQueueFactory.getQueue(mailQueueName) + .orElseThrow( + () -> ErrorResponder.builder() + .message("%s can not be found", mailQueueName) + .statusCode(HttpStatus.NOT_FOUND_404) + .type(ErrorResponder.ErrorType.NOT_FOUND) + .haltError()); + } + + private Instant getOlderThan(Request req) { + try { + Duration olderThan = Optional.ofNullable(req.queryParams("olderThan")) + .filter(Predicate.not(String::isEmpty)) + .map(rawString -> DurationParser.parse(rawString, ChronoUnit.DAYS)) + .orElseThrow(); + + return clock.instant().minus(olderThan); + } catch (NoSuchElementException e) { + throw ErrorResponder.builder() + .message("Missing olderThan") + .statusCode(HttpStatus.BAD_REQUEST_400) + .type(ErrorResponder.ErrorType.INVALID_ARGUMENT) + .haltError(); + } catch (Exception e) { + throw ErrorResponder.builder() + .statusCode(HttpStatus.BAD_REQUEST_400) + .cause(e) + .type(ErrorResponder.ErrorType.INVALID_ARGUMENT) + .message("Invalid olderThan") + .haltError(); + } + } +} diff --git a/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/service/RepublishNotProcessedMailsTaskAdditionalInformationDTO.java b/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/service/RepublishNotProcessedMailsTaskAdditionalInformationDTO.java new file mode 100644 index 0000000..3621530 --- /dev/null +++ b/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/service/RepublishNotProcessedMailsTaskAdditionalInformationDTO.java @@ -0,0 +1,90 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.webadmin.service; + +import java.time.Instant; + +import org.apache.james.json.DTOModule; +import org.apache.james.queue.api.MailQueueName; +import org.apache.james.server.task.json.dto.AdditionalInformationDTO; +import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class RepublishNotProcessedMailsTaskAdditionalInformationDTO implements AdditionalInformationDTO { + + public static AdditionalInformationDTOModule<RepublishNotprocessedMailsTask.AdditionalInformation, RepublishNotProcessedMailsTaskAdditionalInformationDTO> module() { + return DTOModule.forDomainObject(RepublishNotprocessedMailsTask.AdditionalInformation.class) + .convertToDTO(RepublishNotProcessedMailsTaskAdditionalInformationDTO.class) + .toDomainObjectConverter(dto -> new RepublishNotprocessedMailsTask.AdditionalInformation( + MailQueueName.of(dto.mailQueue), + dto.olderThan, + dto.nbRequeuedMails, + dto.timestamp)) + .toDTOConverter((details, type) -> new RepublishNotProcessedMailsTaskAdditionalInformationDTO( + type, + details.getMailQueue().asString(), + details.getOlderThan(), + details.getNbRequeuedMails(), + details.timestamp())) + .typeName(RepublishNotprocessedMailsTask.TYPE.asString()) + .withFactory(AdditionalInformationDTOModule::new); + } + + private final String type; + private final String mailQueue; + + private final long nbRequeuedMails; + private final Instant olderThan; + private final Instant timestamp; + + public RepublishNotProcessedMailsTaskAdditionalInformationDTO(@JsonProperty("type") String type, + @JsonProperty("mailQueue") String mailQueue, + @JsonProperty("olderThan") Instant olderThan, + @JsonProperty("nbRequeuedMails") long nbRequeuedMails, + @JsonProperty("timestamp") Instant timestamp) { + this.type = type; + this.mailQueue = mailQueue; + this.olderThan = olderThan; + this.nbRequeuedMails = nbRequeuedMails; + this.timestamp = timestamp; + } + + public String getMailQueue() { + return mailQueue; + } + + public long getNbRequeuedMails() { + return nbRequeuedMails; + } + + public Instant getOlderThan() { + return olderThan; + } + + @Override + public String getType() { + return type; + } + + @Override + public Instant getTimestamp() { + return timestamp; + } +} diff --git a/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/service/RepublishNotProcessedMailsTaskDTO.java b/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/service/RepublishNotProcessedMailsTaskDTO.java new file mode 100644 index 0000000..5502b35 --- /dev/null +++ b/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/service/RepublishNotProcessedMailsTaskDTO.java @@ -0,0 +1,85 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.webadmin.service; + +import java.time.Instant; + +import org.apache.james.json.DTOModule; +import org.apache.james.queue.api.MailQueueFactory; +import org.apache.james.queue.api.MailQueueName; +import org.apache.james.queue.rabbitmq.RabbitMQMailQueue; +import org.apache.james.server.task.json.dto.TaskDTO; +import org.apache.james.server.task.json.dto.TaskDTOModule; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class RepublishNotProcessedMailsTaskDTO implements TaskDTO { + + public static class UnknownMailQueueException extends RuntimeException { + public UnknownMailQueueException(MailQueueName mailQueueName) { + super("Unknown mail queue " + mailQueueName.asString()); + } + } + + public static TaskDTOModule<RepublishNotprocessedMailsTask, RepublishNotProcessedMailsTaskDTO> module(MailQueueFactory<RabbitMQMailQueue> mailQueueFactory) { + return DTOModule + .forDomainObject(RepublishNotprocessedMailsTask.class) + .convertToDTO(RepublishNotProcessedMailsTaskDTO.class) + .toDomainObjectConverter(dto -> dto.fromDTO(mailQueueFactory)) + .toDTOConverter(RepublishNotProcessedMailsTaskDTO::toDTO) + .typeName(RepublishNotprocessedMailsTask.TYPE.asString()) + .withFactory(TaskDTOModule::new); + } + + public static RepublishNotProcessedMailsTaskDTO toDTO(RepublishNotprocessedMailsTask domainObject, String typeName) { + return new RepublishNotProcessedMailsTaskDTO(typeName, domainObject.getMailQueue().asString(), domainObject.getOlderThan()); + } + + private final String type; + private final String mailQueue; + private final Instant olderThan; + + public RepublishNotProcessedMailsTaskDTO(@JsonProperty("type") String type, @JsonProperty("mailQueue") String mailQueue, @JsonProperty("olderThan") Instant olderThan) { + this.type = type; + this.mailQueue = mailQueue; + this.olderThan = olderThan; + } + + public RepublishNotprocessedMailsTask fromDTO(MailQueueFactory<RabbitMQMailQueue> mailQueueFactory) { + MailQueueName requestedMailQueueName = MailQueueName.of(mailQueue); + RabbitMQMailQueue queue = mailQueueFactory + .getQueue(requestedMailQueueName) + .orElseThrow(() -> new UnknownMailQueueException(requestedMailQueueName)); + + return new RepublishNotprocessedMailsTask(queue, olderThan); + } + + @Override + public String getType() { + return type; + } + + public Instant getOlderThan() { + return olderThan; + } + + public String getMailQueue() { + return mailQueue; + } +} diff --git a/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/service/RepublishNotprocessedMailsTask.java b/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/service/RepublishNotprocessedMailsTask.java new file mode 100644 index 0000000..0428335 --- /dev/null +++ b/server/protocols/webadmin/webadmin-rabbitmq/src/main/java/org/apache/james/webadmin/service/RepublishNotprocessedMailsTask.java @@ -0,0 +1,107 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.webadmin.service; + +import java.time.Clock; +import java.time.Instant; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.james.queue.api.MailQueueName; +import org.apache.james.queue.rabbitmq.RabbitMQMailQueue; +import org.apache.james.task.Task; +import org.apache.james.task.TaskExecutionDetails; +import org.apache.james.task.TaskType; + + +public class RepublishNotprocessedMailsTask implements Task { + + public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation { + + private final Instant timestamp; + private final long nbRequeuedMails; + private final MailQueueName mailQueue; + private final Instant olderThan; + + public AdditionalInformation(MailQueueName mailQueue, Instant olderThan, long nbRequeuedMails, Instant timestamp) { + this.mailQueue = mailQueue; + this.olderThan = olderThan; + this.timestamp = timestamp; + this.nbRequeuedMails = nbRequeuedMails; + } + + @Override + public Instant timestamp() { + return timestamp; + } + + public Instant getOlderThan() { + return olderThan; + } + + public MailQueueName getMailQueue() { + return mailQueue; + } + + public long getNbRequeuedMails() { + return nbRequeuedMails; + } + } + + public static final TaskType TYPE = TaskType.of("republish-not-processed-mails"); + + private final Instant olderThan; + private final RabbitMQMailQueue mailQueue; + private final AtomicInteger nbRequeuedMails; + + public RepublishNotprocessedMailsTask(RabbitMQMailQueue mailQueue, Instant olderThan) { + this.olderThan = olderThan; + this.mailQueue = mailQueue; + this.nbRequeuedMails = new AtomicInteger(0); + } + + @Override + public Result run() { + mailQueue.republishNotProcessedMails(olderThan) + .doOnNext(mailName -> nbRequeuedMails.getAndIncrement()) + .then() + .block(); + + return Result.COMPLETED; + } + + @Override + public TaskType type() { + return TYPE; + } + + @Override + public Optional<TaskExecutionDetails.AdditionalInformation> details() { + return Optional.of(new AdditionalInformation(mailQueue.getName(), olderThan, nbRequeuedMails.get(), Clock.systemUTC().instant())); + } + + public Instant getOlderThan() { + return olderThan; + } + + public MailQueueName getMailQueue() { + return mailQueue.getName(); + } +} diff --git a/server/protocols/webadmin/webadmin-rabbitmq/src/test/java/org/apache/james/webadmin/routes/RabbitMQMailQueuesRoutesTest.java b/server/protocols/webadmin/webadmin-rabbitmq/src/test/java/org/apache/james/webadmin/routes/RabbitMQMailQueuesRoutesTest.java new file mode 100644 index 0000000..4daaa08 --- /dev/null +++ b/server/protocols/webadmin/webadmin-rabbitmq/src/test/java/org/apache/james/webadmin/routes/RabbitMQMailQueuesRoutesTest.java @@ -0,0 +1,144 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.webadmin.routes; + +import static io.restassured.RestAssured.given; +import static io.restassured.config.EncoderConfig.encoderConfig; +import static io.restassured.config.RestAssuredConfig.newConfig; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasSize; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.nio.charset.StandardCharsets; +import java.time.Clock; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.util.Optional; + +import org.apache.james.json.DTOConverter; +import org.apache.james.queue.api.MailQueueFactory; +import org.apache.james.queue.rabbitmq.RabbitMQMailQueue; +import org.apache.james.queue.rabbitmq.RabbitMQMailQueueFactory; +import org.apache.james.task.Hostname; +import org.apache.james.task.MemoryTaskManager; +import org.apache.james.task.TaskManager; +import org.apache.james.utils.UpdatableTickingClock; +import org.apache.james.webadmin.WebAdminServer; +import org.apache.james.webadmin.WebAdminUtils; +import org.apache.james.webadmin.utils.JsonTransformer; +import org.eclipse.jetty.http.HttpStatus; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.restassured.RestAssured; +import io.restassured.builder.RequestSpecBuilder; +import io.restassured.http.ContentType; +import io.restassured.specification.RequestSpecification; + +class RabbitMQMailQueuesRoutesTest { + final static ZonedDateTime DATE = ZonedDateTime.parse("2015-10-30T14:12:00Z"); + + WebAdminServer webAdminServer; + MailQueueFactory mailQueueFactory; + Clock clock; + + WebAdminServer createServer(MailQueueFactory mailQueueFactory) { + TaskManager taskManager = new MemoryTaskManager(new Hostname("foo")); + JsonTransformer jsonTransformer = new JsonTransformer(); + clock = UpdatableTickingClock.fixed(DATE.toInstant(), ZoneOffset.UTC); + return WebAdminUtils.createWebAdminServer( + new RabbitMQMailQueuesRoutes(mailQueueFactory, clock, jsonTransformer, taskManager), + new TasksRoutes(taskManager, jsonTransformer, DTOConverter.of())) + .start(); + } + + RequestSpecification buildRequestSpecification(WebAdminServer server) { + return new RequestSpecBuilder() + .setContentType(ContentType.JSON) + .setAccept(ContentType.JSON) + .setBasePath("/") + .setPort(server.getPort().getValue()) + .setConfig(newConfig().encoderConfig(encoderConfig().defaultContentCharset(StandardCharsets.UTF_8))) + .build(); + } + + @BeforeEach + void setUp() { + mailQueueFactory = mock(RabbitMQMailQueueFactory.class); + webAdminServer = createServer(mailQueueFactory); + RestAssured.requestSpecification = buildRequestSpecification(webAdminServer); + RestAssured.enableLoggingOfRequestAndResponseIfValidationFails(); + } + + @AfterEach + void tearDown() { + webAdminServer.destroy(); + } + + @Test + void triggeringARepublishNotProcessedMailsShouldCreateATask() { + when(mailQueueFactory.getQueue(any())).thenReturn(Optional.of(mock(RabbitMQMailQueue.class))); + given() + .queryParam("action", "RepublishNotProcessedMails") + .queryParam("olderThan", "1d") + .when() + .post(MailQueueRoutes.BASE_URL + "/spooler") + .then() + .statusCode(HttpStatus.CREATED_201); + + given() + .when() + .get("/tasks") + .then() + .statusCode(HttpStatus.OK_200) + .body("", hasSize(1)); + } + + @Test + void triggeringARepublishNotProcessedMailsWhenTheQueueHasNotBeenInitializedShouldFail() { + when(mailQueueFactory.getQueue(any())).thenReturn(Optional.empty()); + given() + .queryParam("action", "RepublishNotProcessedMails") + .queryParam("olderThan", "1d") + .when() + .post(MailQueueRoutes.BASE_URL + "/spooler") + .then() + .statusCode(HttpStatus.NOT_FOUND_404) + .body("message", containsString("MailQueueName{value=spooler} can not be found")); + } + + @Test + void triggeringARepublishNotProcessedMailsWithAnInvalidOlderThanShouldFail() { + when(mailQueueFactory.getQueue(any())).thenReturn(Optional.of(mock(RabbitMQMailQueue.class))); + given() + .queryParam("action", "RepublishNotProcessedMails") + .queryParam("olderThan", "invalidValue") + .when() + .post(MailQueueRoutes.BASE_URL + "/spooler") + .then() + .statusCode(HttpStatus.BAD_REQUEST_400) + .body("message", containsString("Invalid olderThan")) + .body("details", containsString("Supplied value do not follow the unit format (number optionally suffixed with a string representing the unit")); + } + +} diff --git a/server/protocols/webadmin/webadmin-rabbitmq/src/test/java/org/apache/james/webadmin/service/RepublishNotprocessedMailsTaskTest.java b/server/protocols/webadmin/webadmin-rabbitmq/src/test/java/org/apache/james/webadmin/service/RepublishNotprocessedMailsTaskTest.java new file mode 100644 index 0000000..ac35da1 --- /dev/null +++ b/server/protocols/webadmin/webadmin-rabbitmq/src/test/java/org/apache/james/webadmin/service/RepublishNotprocessedMailsTaskTest.java @@ -0,0 +1,108 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + * ***************************************************************/ + +package org.apache.james.webadmin.service; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.time.Instant; +import java.util.Optional; + +import org.apache.james.JsonSerializationVerifier; +import org.apache.james.json.JsonGenericSerializer; +import org.apache.james.queue.api.MailQueueName; +import org.apache.james.queue.rabbitmq.RabbitMQMailQueue; +import org.apache.james.queue.rabbitmq.RabbitMQMailQueueFactory; +import org.junit.jupiter.api.Test; + +class RepublishNotprocessedMailsTaskTest { + private static final Instant OLDER_THAN = Instant.parse("2018-11-13T12:00:55Z"); + private static final Instant NOW = Instant.now(); + private static final long NB_REQUEUED_MAILS = 12; + private static final String SERIALIZED = "{\"type\": \"republish-not-processed-mails\",\"mailQueue\":\"anyQueue\", \"olderThan\": \"" + OLDER_THAN + "\"}"; + private static final String SERIALIZED_TASK_ADDITIONAL_INFORMATION = "{\"type\": \"republish-not-processed-mails\",\"mailQueue\":\"anyQueue\", \"olderThan\": \"" + OLDER_THAN + "\" ,\"nbRequeuedMails\":12,\"timestamp\":\"" + NOW.toString() + "\"}"; + private static final MailQueueName QUEUE_NAME = MailQueueName.of("anyQueue"); + + @Test + void taskShouldBeSerializable() throws Exception { + RabbitMQMailQueueFactory mockedQueueFactory = mock(RabbitMQMailQueueFactory.class); + RabbitMQMailQueue mockedQueue = mock(RabbitMQMailQueue.class); + + when(mockedQueue.getName()).thenReturn(QUEUE_NAME); + when(mockedQueueFactory.getQueue(QUEUE_NAME)).thenReturn(Optional.of(mockedQueue)); + + RepublishNotprocessedMailsTask task = new RepublishNotprocessedMailsTask(mockedQueue, OLDER_THAN); + + JsonSerializationVerifier.dtoModule(RepublishNotProcessedMailsTaskDTO.module(mockedQueueFactory)) + .bean(task) + .json(SERIALIZED) + .verify(); + } + + @Test + void taskShouldBeDeserializable() throws Exception { + RabbitMQMailQueueFactory mockedQueueFactory = mock(RabbitMQMailQueueFactory.class); + RabbitMQMailQueue mockedQueue = mock(RabbitMQMailQueue.class); + + when(mockedQueue.getName()).thenReturn(QUEUE_NAME); + when(mockedQueueFactory.getQueue(QUEUE_NAME)).thenReturn(Optional.of(mockedQueue)); + + RepublishNotprocessedMailsTask task = new RepublishNotprocessedMailsTask(mockedQueue, OLDER_THAN); + JsonSerializationVerifier.dtoModule(RepublishNotProcessedMailsTaskDTO.module(mockedQueueFactory)) + .bean(task) + .json(SERIALIZED) + .verify(); + } + + @Test + void taskDeserializationFromUnknownQueueNameShouldThrow() { + RabbitMQMailQueueFactory mockedQueueFactory = mock(RabbitMQMailQueueFactory.class); + RabbitMQMailQueue mockedQueue = mock(RabbitMQMailQueue.class); + + when(mockedQueue.getName()).thenReturn(QUEUE_NAME); + when(mockedQueueFactory.getQueue(QUEUE_NAME)).thenReturn(Optional.empty()); + + RepublishNotprocessedMailsTask task = new RepublishNotprocessedMailsTask(mockedQueue, OLDER_THAN); + assertThatThrownBy(() -> JsonSerializationVerifier.dtoModule(RepublishNotProcessedMailsTaskDTO.module(mockedQueueFactory)) + .bean(task) + .json(SERIALIZED) + .verify()) + .isInstanceOf(RepublishNotProcessedMailsTaskDTO.UnknownMailQueueException.class); + } + + @Test + void additionalInformationShouldBeSerializable() throws Exception { + RepublishNotprocessedMailsTask.AdditionalInformation details = new RepublishNotprocessedMailsTask.AdditionalInformation(QUEUE_NAME, OLDER_THAN, NB_REQUEUED_MAILS, NOW); + JsonSerializationVerifier.dtoModule(RepublishNotProcessedMailsTaskAdditionalInformationDTO.module()) + .bean(details) + .json(SERIALIZED_TASK_ADDITIONAL_INFORMATION) + .verify(); + } + + @Test + void additionalInformationShouldBeDeserializable() throws Exception { + RepublishNotprocessedMailsTask.AdditionalInformation details = new RepublishNotprocessedMailsTask.AdditionalInformation(QUEUE_NAME, OLDER_THAN, NB_REQUEUED_MAILS, NOW); + RepublishNotprocessedMailsTask.AdditionalInformation deserialized = JsonGenericSerializer.forModules(RepublishNotProcessedMailsTaskAdditionalInformationDTO.module()) + .withoutNestedType() + .deserialize(SERIALIZED_TASK_ADDITIONAL_INFORMATION); + assertThat(deserialized).isEqualToComparingFieldByField(details); + } +} diff --git a/src/site/markdown/server/manage-webadmin.md b/src/site/markdown/server/manage-webadmin.md index 707417d..ff18829 100644 --- a/src/site/markdown/server/manage-webadmin.md +++ b/src/site/markdown/server/manage-webadmin.md @@ -2900,6 +2900,7 @@ The scheduled task will have the following type `reprocessing-one` and the follo - [Deleting mails from a mail queue](#Deleting_mails_from_a_mail_queue) - [Clearing a mail queue](#Clearing_a_mail_queue) - [Flushing mails from a mail queue](#Flushing_mails_from_a_mail_queue) + - [RabbitMQ republishing a mail queue from cassandra](#RabbitMQ_republishing_a_mail_queue_from_cassandra) ### Listing mail queues @@ -3048,6 +3049,40 @@ Response codes: - 204: Success (No content) - 400: Invalid request - 404: The mail queue does not exist + +### RabbitMQ republishing a mail queue from cassandra + +``` +curl -XPOST 'http://ip:port/mailQueues/{mailQueueName}?action=RepublishNotProcessedMails&olderThan=1d' +``` + +This method is specific to the distributed flavor of James, which relies on Cassandra and RabbitMQ for implementing a mail queue. +In case of a RabbitMQ crash resulting in a loss of messages, this task can be launched to repopulate the +`mailQueueName` queue in RabbitMQ using the information stored in Cassandra. + +The `olderThan` parameter is mandatory. It filters the mails to be restored, by taking into account only +the mails older than the given value. +The expected value should be expressed in the following format: `Nunit`. +`N` should be strictly positive. +`unit` could be either in the short form (`h`, `d`, `w`, etc.), or in the long form (`day`, `week`, `month`, etc.). + +Examples: + + - `5h` + - `7d` + - `1y` + +Response codes: + + - 201: Task created + - 400: Invalid request + + The response body contains the id of the republishing task. + ``` + { + "taskId": "a650a66a-5984-431e-bdad-f1baad885856" + } + ``` ## Administrating DLP Configuration --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org