JAMES-2556 Fix reprocessing upon concurent mail deletion Single key reprocessing should fail on missing key while full reprocessing should not.
Because the argument list keeps growing adding a 'Reprocessor' object helps readability. Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/f15655b2 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/f15655b2 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/f15655b2 Branch: refs/heads/master Commit: f15655b20e3e0e3b35707773ba70ddded2563403 Parents: 95b4ead Author: Benoit Tellier <btell...@linagora.com> Authored: Wed Oct 10 10:41:29 2018 +0700 Committer: Benoit Tellier <btell...@linagora.com> Committed: Wed Oct 10 15:46:19 2018 +0700 ---------------------------------------------------------------------- .../webadmin/service/ReprocessingService.java | 47 +++++++++++++++----- .../service/ReprocessingServiceTest.java | 2 - 2 files changed, 36 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/f15655b2/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java ---------------------------------------------------------------------- diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java index f02bd3f..c316b95 100644 --- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java +++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java @@ -31,12 +31,38 @@ import org.apache.james.mailrepository.api.MailRepositoryPath; import org.apache.james.mailrepository.api.MailRepositoryStore; import org.apache.james.queue.api.MailQueue; import org.apache.james.queue.api.MailQueueFactory; +import org.apache.james.util.OptionalUtils; import org.apache.james.util.streams.Iterators; import org.apache.mailet.Mail; import com.github.fge.lambdas.Throwing; public class ReprocessingService { + public static class MissingKeyException extends RuntimeException { + MissingKeyException(MailKey key) { + super(key.asString() + " can not be found"); + } + } + + static class Reprocessor { + private final MailQueue mailQueue; + private final Optional<String> targetProcessor; + + Reprocessor(MailQueue mailQueue, Optional<String> targetProcessor) { + this.mailQueue = mailQueue; + this.targetProcessor = targetProcessor; + } + + private void reprocess(MailRepository repository, Mail mail) { + try { + targetProcessor.ifPresent(mail::setState); + mailQueue.enQueue(mail); + repository.remove(mail); + } catch (Exception e) { + throw new RuntimeException("Error encountered while reprocessing mail " + mail.getName(), e); + } + } + } private final MailQueueFactory<?> mailQueueFactory; private final MailRepositoryStoreService mailRepositoryStoreService; @@ -49,29 +75,28 @@ public class ReprocessingService { } public void reprocessAll(MailRepositoryPath path, Optional<String> targetProcessor, String targetQueue, Consumer<MailKey> keyListener) throws MailRepositoryStore.MailRepositoryStoreException, MessagingException { - MailQueue mailQueue = getMailQueue(targetQueue); + Reprocessor reprocessor = new Reprocessor(getMailQueue(targetQueue), targetProcessor); mailRepositoryStoreService .getRepositories(path) .forEach(Throwing.consumer((MailRepository repository) -> Iterators.toStream(repository.list()) .peek(keyListener) - .forEach(Throwing.consumer(key -> reprocess(repository, mailQueue, key, targetProcessor)))).sneakyThrow()); + .map(Throwing.function(key -> Optional.ofNullable(repository.retrieve(key)))) + .flatMap(OptionalUtils::toStream) + .forEach(mail -> reprocessor.reprocess(repository, mail)))); } public void reprocess(MailRepositoryPath path, MailKey key, Optional<String> targetProcessor, String targetQueue) throws MailRepositoryStore.MailRepositoryStoreException, MessagingException { - MailQueue mailQueue = getMailQueue(targetQueue); + Reprocessor reprocessor = new Reprocessor(getMailQueue(targetQueue), targetProcessor); mailRepositoryStoreService .getRepositories(path) - .forEach(Throwing.consumer((MailRepository repository) -> reprocess(repository, mailQueue, key, targetProcessor)).sneakyThrow()); - } - - private void reprocess(MailRepository repository, MailQueue mailQueue, MailKey key, Optional<String> targetProcessor) throws MessagingException { - Mail mail = repository.retrieve(key); - targetProcessor.ifPresent(mail::setState); - mailQueue.enQueue(mail); - repository.remove(key); + .forEach(Throwing.consumer((MailRepository repository) -> + reprocessor.reprocess(repository, + Optional.ofNullable(repository.retrieve(key)) + .orElseThrow(() -> new MissingKeyException(key)))) + .sneakyThrow()); } private MailQueue getMailQueue(String targetQueue) { http://git-wip-us.apache.org/repos/asf/james-project/blob/f15655b2/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java ---------------------------------------------------------------------- diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java index c1d3688..9e09a8d 100644 --- a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java +++ b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java @@ -42,7 +42,6 @@ import org.apache.james.server.core.configuration.FileConfigurationProvider; import org.apache.james.server.core.filesystem.FileSystemImpl; import org.apache.mailet.base.test.FakeMail; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import com.github.fge.lambdas.Throwing; @@ -148,7 +147,6 @@ public class ReprocessingServiceTest { .containsOnly(NAME_1, NAME_2, NAME_3); } - @Ignore @Test public void reprocessingShouldNotFailOnConcurrentDeletion() throws Exception { MailRepository repository = mailRepositoryStore.select(MailRepositoryUrl.fromPathAndProtocol(PATH, MEMORY_PROTOCOL)); --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org