JAMES-2556 Fix reprocessing upon concurent mail deletion

    Single key reprocessing should fail on missing key while full reprocessing 
should not.

    Because the argument list keeps growing adding a 'Reprocessor' object helps 
readability.


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/f15655b2
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/f15655b2
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/f15655b2

Branch: refs/heads/master
Commit: f15655b20e3e0e3b35707773ba70ddded2563403
Parents: 95b4ead
Author: Benoit Tellier <btell...@linagora.com>
Authored: Wed Oct 10 10:41:29 2018 +0700
Committer: Benoit Tellier <btell...@linagora.com>
Committed: Wed Oct 10 15:46:19 2018 +0700

----------------------------------------------------------------------
 .../webadmin/service/ReprocessingService.java   | 47 +++++++++++++++-----
 .../service/ReprocessingServiceTest.java        |  2 -
 2 files changed, 36 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/f15655b2/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
----------------------------------------------------------------------
diff --git 
a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
 
b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
index f02bd3f..c316b95 100644
--- 
a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
+++ 
b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
@@ -31,12 +31,38 @@ import 
org.apache.james.mailrepository.api.MailRepositoryPath;
 import org.apache.james.mailrepository.api.MailRepositoryStore;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.api.MailQueueFactory;
+import org.apache.james.util.OptionalUtils;
 import org.apache.james.util.streams.Iterators;
 import org.apache.mailet.Mail;
 
 import com.github.fge.lambdas.Throwing;
 
 public class ReprocessingService {
+    public static class MissingKeyException extends RuntimeException {
+        MissingKeyException(MailKey key) {
+            super(key.asString() + " can not be found");
+        }
+    }
+
+    static class Reprocessor {
+        private final MailQueue mailQueue;
+        private final Optional<String> targetProcessor;
+
+        Reprocessor(MailQueue mailQueue, Optional<String> targetProcessor) {
+            this.mailQueue = mailQueue;
+            this.targetProcessor = targetProcessor;
+        }
+
+        private void reprocess(MailRepository repository, Mail mail) {
+            try {
+                targetProcessor.ifPresent(mail::setState);
+                mailQueue.enQueue(mail);
+                repository.remove(mail);
+            } catch (Exception e) {
+                throw new RuntimeException("Error encountered while 
reprocessing mail " + mail.getName(), e);
+            }
+        }
+    }
 
     private final MailQueueFactory<?> mailQueueFactory;
     private final MailRepositoryStoreService mailRepositoryStoreService;
@@ -49,29 +75,28 @@ public class ReprocessingService {
     }
 
     public void reprocessAll(MailRepositoryPath path, Optional<String> 
targetProcessor, String targetQueue, Consumer<MailKey> keyListener) throws 
MailRepositoryStore.MailRepositoryStoreException, MessagingException {
-        MailQueue mailQueue = getMailQueue(targetQueue);
+        Reprocessor reprocessor = new Reprocessor(getMailQueue(targetQueue), 
targetProcessor);
 
         mailRepositoryStoreService
             .getRepositories(path)
             .forEach(Throwing.consumer((MailRepository repository) ->
                 Iterators.toStream(repository.list())
                     .peek(keyListener)
-                    .forEach(Throwing.consumer(key -> reprocess(repository, 
mailQueue, key, targetProcessor)))).sneakyThrow());
+                    .map(Throwing.function(key -> 
Optional.ofNullable(repository.retrieve(key))))
+                    .flatMap(OptionalUtils::toStream)
+                    .forEach(mail -> reprocessor.reprocess(repository, 
mail))));
     }
 
     public void reprocess(MailRepositoryPath path, MailKey key, 
Optional<String> targetProcessor, String targetQueue) throws 
MailRepositoryStore.MailRepositoryStoreException, MessagingException {
-        MailQueue mailQueue = getMailQueue(targetQueue);
+        Reprocessor reprocessor = new Reprocessor(getMailQueue(targetQueue), 
targetProcessor);
 
         mailRepositoryStoreService
             .getRepositories(path)
-            .forEach(Throwing.consumer((MailRepository repository) -> 
reprocess(repository, mailQueue, key, targetProcessor)).sneakyThrow());
-    }
-
-    private void reprocess(MailRepository repository, MailQueue mailQueue, 
MailKey key, Optional<String> targetProcessor) throws MessagingException {
-        Mail mail = repository.retrieve(key);
-        targetProcessor.ifPresent(mail::setState);
-        mailQueue.enQueue(mail);
-        repository.remove(key);
+            .forEach(Throwing.consumer((MailRepository repository) ->
+                reprocessor.reprocess(repository,
+                    Optional.ofNullable(repository.retrieve(key))
+                        .orElseThrow(() -> new MissingKeyException(key))))
+                .sneakyThrow());
     }
 
     private MailQueue getMailQueue(String targetQueue) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/f15655b2/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java
----------------------------------------------------------------------
diff --git 
a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java
 
b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java
index c1d3688..9e09a8d 100644
--- 
a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java
+++ 
b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java
@@ -42,7 +42,6 @@ import 
org.apache.james.server.core.configuration.FileConfigurationProvider;
 import org.apache.james.server.core.filesystem.FileSystemImpl;
 import org.apache.mailet.base.test.FakeMail;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import com.github.fge.lambdas.Throwing;
@@ -148,7 +147,6 @@ public class ReprocessingServiceTest {
             .containsOnly(NAME_1, NAME_2, NAME_3);
     }
 
-    @Ignore
     @Test
     public void reprocessingShouldNotFailOnConcurrentDeletion() throws 
Exception {
         MailRepository repository = 
mailRepositoryStore.select(MailRepositoryUrl.fromPathAndProtocol(PATH, 
MEMORY_PROTOCOL));


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to