JAMES-2556 Fix reprocessing upon concurent mail deletion
Single key reprocessing should fail on missing key while full reprocessing
should not.
Because the argument list keeps growing adding a 'Reprocessor' object helps
readability.
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/f15655b2
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/f15655b2
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/f15655b2
Branch: refs/heads/master
Commit: f15655b20e3e0e3b35707773ba70ddded2563403
Parents: 95b4ead
Author: Benoit Tellier <[email protected]>
Authored: Wed Oct 10 10:41:29 2018 +0700
Committer: Benoit Tellier <[email protected]>
Committed: Wed Oct 10 15:46:19 2018 +0700
----------------------------------------------------------------------
.../webadmin/service/ReprocessingService.java | 47 +++++++++++++++-----
.../service/ReprocessingServiceTest.java | 2 -
2 files changed, 36 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/f15655b2/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
----------------------------------------------------------------------
diff --git
a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
index f02bd3f..c316b95 100644
---
a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
+++
b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/service/ReprocessingService.java
@@ -31,12 +31,38 @@ import
org.apache.james.mailrepository.api.MailRepositoryPath;
import org.apache.james.mailrepository.api.MailRepositoryStore;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueueFactory;
+import org.apache.james.util.OptionalUtils;
import org.apache.james.util.streams.Iterators;
import org.apache.mailet.Mail;
import com.github.fge.lambdas.Throwing;
public class ReprocessingService {
+ public static class MissingKeyException extends RuntimeException {
+ MissingKeyException(MailKey key) {
+ super(key.asString() + " can not be found");
+ }
+ }
+
+ static class Reprocessor {
+ private final MailQueue mailQueue;
+ private final Optional<String> targetProcessor;
+
+ Reprocessor(MailQueue mailQueue, Optional<String> targetProcessor) {
+ this.mailQueue = mailQueue;
+ this.targetProcessor = targetProcessor;
+ }
+
+ private void reprocess(MailRepository repository, Mail mail) {
+ try {
+ targetProcessor.ifPresent(mail::setState);
+ mailQueue.enQueue(mail);
+ repository.remove(mail);
+ } catch (Exception e) {
+ throw new RuntimeException("Error encountered while
reprocessing mail " + mail.getName(), e);
+ }
+ }
+ }
private final MailQueueFactory<?> mailQueueFactory;
private final MailRepositoryStoreService mailRepositoryStoreService;
@@ -49,29 +75,28 @@ public class ReprocessingService {
}
public void reprocessAll(MailRepositoryPath path, Optional<String>
targetProcessor, String targetQueue, Consumer<MailKey> keyListener) throws
MailRepositoryStore.MailRepositoryStoreException, MessagingException {
- MailQueue mailQueue = getMailQueue(targetQueue);
+ Reprocessor reprocessor = new Reprocessor(getMailQueue(targetQueue),
targetProcessor);
mailRepositoryStoreService
.getRepositories(path)
.forEach(Throwing.consumer((MailRepository repository) ->
Iterators.toStream(repository.list())
.peek(keyListener)
- .forEach(Throwing.consumer(key -> reprocess(repository,
mailQueue, key, targetProcessor)))).sneakyThrow());
+ .map(Throwing.function(key ->
Optional.ofNullable(repository.retrieve(key))))
+ .flatMap(OptionalUtils::toStream)
+ .forEach(mail -> reprocessor.reprocess(repository,
mail))));
}
public void reprocess(MailRepositoryPath path, MailKey key,
Optional<String> targetProcessor, String targetQueue) throws
MailRepositoryStore.MailRepositoryStoreException, MessagingException {
- MailQueue mailQueue = getMailQueue(targetQueue);
+ Reprocessor reprocessor = new Reprocessor(getMailQueue(targetQueue),
targetProcessor);
mailRepositoryStoreService
.getRepositories(path)
- .forEach(Throwing.consumer((MailRepository repository) ->
reprocess(repository, mailQueue, key, targetProcessor)).sneakyThrow());
- }
-
- private void reprocess(MailRepository repository, MailQueue mailQueue,
MailKey key, Optional<String> targetProcessor) throws MessagingException {
- Mail mail = repository.retrieve(key);
- targetProcessor.ifPresent(mail::setState);
- mailQueue.enQueue(mail);
- repository.remove(key);
+ .forEach(Throwing.consumer((MailRepository repository) ->
+ reprocessor.reprocess(repository,
+ Optional.ofNullable(repository.retrieve(key))
+ .orElseThrow(() -> new MissingKeyException(key))))
+ .sneakyThrow());
}
private MailQueue getMailQueue(String targetQueue) {
http://git-wip-us.apache.org/repos/asf/james-project/blob/f15655b2/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java
----------------------------------------------------------------------
diff --git
a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java
b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java
index c1d3688..9e09a8d 100644
---
a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java
+++
b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java
@@ -42,7 +42,6 @@ import
org.apache.james.server.core.configuration.FileConfigurationProvider;
import org.apache.james.server.core.filesystem.FileSystemImpl;
import org.apache.mailet.base.test.FakeMail;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import com.github.fge.lambdas.Throwing;
@@ -148,7 +147,6 @@ public class ReprocessingServiceTest {
.containsOnly(NAME_1, NAME_2, NAME_3);
}
- @Ignore
@Test
public void reprocessingShouldNotFailOnConcurrentDeletion() throws
Exception {
MailRepository repository =
mailRepositoryStore.select(MailRepositoryUrl.fromPathAndProtocol(PATH,
MEMORY_PROTOCOL));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]