This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 961ef33e62b74982ef3cc17768887288f9b84910 Author: Benoit Tellier <[email protected]> AuthorDate: Tue Jun 18 11:50:30 2019 +0700 JAMES-2794 RabbitMQ MailQueue projections should rely on EnqueueId This allows insertions of several emails sharing the same name. --- .../org/apache/james/queue/rabbitmq/Dequeuer.java | 22 +++++++++---- .../apache/james/queue/rabbitmq/EnqueuedItem.java | 37 +++++++++++++--------- .../org/apache/james/queue/rabbitmq/Enqueuer.java | 13 +++++--- .../james/queue/rabbitmq/MailReferenceDTO.java | 24 +++++++++++--- .../queue/rabbitmq/view/api/DeleteCondition.java | 23 +++++++++++--- .../queue/rabbitmq/view/api/MailQueueView.java | 4 +-- .../view/cassandra/CassandraMailQueueBrowser.java | 3 +- .../cassandra/CassandraMailQueueMailDelete.java | 15 +++------ .../view/cassandra/CassandraMailQueueView.java | 19 ++++++----- .../cassandra/CassandraMailQueueViewModule.java | 15 +++++---- .../rabbitmq/view/cassandra/DeletedMailsDAO.java | 21 ++++++------ .../rabbitmq/view/cassandra/EnqueuedMailsDAO.java | 9 ++++-- .../view/cassandra/EnqueuedMailsDaoUtil.java | 8 +++-- .../james/queue/rabbitmq/EnqueuedItemTest.java | 17 ++++++++++ .../apache/james/queue/rabbitmq/MailDTOTest.java | 3 ++ .../queue/rabbitmq/RabbitMQMailQueueTest.java | 7 ---- .../view/cassandra/DeletedMailsDAOTest.java | 35 +++++++++++--------- .../view/cassandra/EnqueuedMailsDaoTest.java | 17 ++++++---- .../model/EnqueuedItemWithSlicingContextTest.java | 5 ++- .../src/test/resources/json/mail1.json | 1 + .../src/test/resources/json/mail_min.json | 1 + 21 files changed, 191 insertions(+), 108 deletions(-) diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java index ee59815..95f13ee 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.function.Consumer; import java.util.function.Function; +import org.apache.commons.lang3.tuple.Pair; import org.apache.james.metrics.api.Metric; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.queue.api.MailQueue; @@ -46,10 +47,12 @@ class Dequeuer { private static class RabbitMQMailQueueItem implements MailQueue.MailQueueItem { private final Consumer<Boolean> ack; + private final EnQueueId enQueueId; private final Mail mail; - private RabbitMQMailQueueItem(Consumer<Boolean> ack, Mail mail) { + private RabbitMQMailQueueItem(Consumer<Boolean> ack, EnQueueId enQueueId, Mail mail) { this.ack = ack; + this.enQueueId = enQueueId; this.mail = mail; } @@ -58,6 +61,10 @@ class Dequeuer { return mail; } + public EnQueueId getEnQueueId() { + return enQueueId; + } + @Override public void done(boolean success) { ack.accept(success); @@ -87,7 +94,7 @@ class Dequeuer { } private Mono<RabbitMQMailQueueItem> filterIfDeleted(RabbitMQMailQueueItem item) { - return mailQueueView.isPresent(item.getMail()) + return mailQueueView.isPresent(item.getEnQueueId()) .flatMap(isPresent -> { if (isPresent) { return Mono.just(item); @@ -99,9 +106,10 @@ class Dequeuer { private Mono<RabbitMQMailQueueItem> loadItem(AcknowledgableDelivery response) { try { - Mail mail = loadMail(response); + Pair<EnQueueId, Mail> idAndMail = loadMail(response); + Mail mail = idAndMail.getRight(); ThrowingConsumer<Boolean> ack = ack(response, mail); - return Mono.just(new RabbitMQMailQueueItem(ack, mail)); + return Mono.just(new RabbitMQMailQueueItem(ack, idAndMail.getLeft(), mail)); } catch (MailQueue.MailQueueException e) { return Mono.error(e); } @@ -119,9 +127,11 @@ class Dequeuer { }; } - private Mail loadMail(Delivery response) throws MailQueue.MailQueueException { + private Pair<EnQueueId, Mail> loadMail(Delivery response) throws MailQueue.MailQueueException { MailReferenceDTO mailDTO = toMailReference(response); - return mailLoader.apply(mailDTO); + EnQueueId enQueueId = mailDTO.retrieveEnqueueId(); + Mail mail = mailLoader.apply(mailDTO); + return Pair.of(enQueueId, mail); } private MailReferenceDTO toMailReference(Delivery getResponse) throws MailQueue.MailQueueException { diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/EnqueuedItem.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/EnqueuedItem.java index 9991123..d693906 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/EnqueuedItem.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/EnqueuedItem.java @@ -23,7 +23,6 @@ import java.time.Instant; import java.util.Objects; import org.apache.james.blob.mail.MimeMessagePartsId; -import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey; import org.apache.mailet.Mail; import com.google.common.base.Preconditions; @@ -33,6 +32,11 @@ public class EnqueuedItem { interface Builder { @FunctionalInterface + interface RequireEnqueueId { + RequireMailQueueName enQueueId(EnQueueId id); + } + + @FunctionalInterface interface RequireMailQueueName { RequireMail mailQueueName(MailQueueName mailQueueName); } @@ -53,16 +57,20 @@ public class EnqueuedItem { } class ReadyToBuild { + private final EnQueueId enQueueId; private final MailQueueName mailQueueName; private final Mail mail; private final Instant enqueuedTime; private final MimeMessagePartsId partsId; - ReadyToBuild(MailQueueName mailQueueName, Mail mail, Instant enqueuedTime, MimeMessagePartsId partsId) { + ReadyToBuild(EnQueueId enQueueId, MailQueueName mailQueueName, Mail mail, Instant enqueuedTime, MimeMessagePartsId partsId) { + Preconditions.checkNotNull(enQueueId, "'enQueueId' is mandatory"); Preconditions.checkNotNull(mailQueueName, "'mailQueueName' is mandatory"); Preconditions.checkNotNull(mail, "'mail' is mandatory"); Preconditions.checkNotNull(enqueuedTime, "'enqueuedTime' is mandatory"); Preconditions.checkNotNull(partsId, "'partsId' is mandatory"); + + this.enQueueId = enQueueId; this.mailQueueName = mailQueueName; this.mail = mail; this.enqueuedTime = enqueuedTime; @@ -70,28 +78,31 @@ public class EnqueuedItem { } public EnqueuedItem build() { - return new EnqueuedItem(mailQueueName, mail, enqueuedTime, partsId); + return new EnqueuedItem(enQueueId, mailQueueName, mail, enqueuedTime, partsId); } } } - public static Builder.RequireMailQueueName builder() { - return queueName -> mail -> enqueuedTime -> partsId -> new Builder.ReadyToBuild(queueName, mail, enqueuedTime, partsId); + public static Builder.RequireEnqueueId builder() { + return enQueueId -> queueName -> mail -> enqueuedTime -> partsId -> new Builder.ReadyToBuild(enQueueId, queueName, mail, enqueuedTime, partsId); } + private final EnQueueId enQueueId; private final MailQueueName mailQueueName; private final Mail mail; - private final MailKey mailKey; private final Instant enqueuedTime; private final MimeMessagePartsId partsId; - EnqueuedItem(MailQueueName mailQueueName, Mail mail, Instant enqueuedTime, MimeMessagePartsId partsId) { + EnqueuedItem(EnQueueId enQueueId, MailQueueName mailQueueName, Mail mail, Instant enqueuedTime, MimeMessagePartsId partsId) { + this.enQueueId = enQueueId; this.mailQueueName = mailQueueName; this.mail = mail; this.enqueuedTime = enqueuedTime; this.partsId = partsId; + } - this.mailKey = MailKey.of(mail.getName()); + public EnQueueId getEnQueueId() { + return enQueueId; } public MailQueueName getMailQueueName() { @@ -110,18 +121,14 @@ public class EnqueuedItem { return partsId; } - public MailKey getMailKey() { - return mailKey; - } - @Override public final boolean equals(Object o) { if (o instanceof EnqueuedItem) { EnqueuedItem that = (EnqueuedItem) o; - return Objects.equals(this.mailQueueName, that.mailQueueName) + return Objects.equals(this.enQueueId, that.enQueueId) + && Objects.equals(this.mailQueueName, that.mailQueueName) && Objects.equals(this.mail, that.mail) - && Objects.equals(this.mailKey, that.mailKey) && Objects.equals(this.enqueuedTime, that.enqueuedTime) && Objects.equals(this.partsId, that.partsId); } @@ -130,6 +137,6 @@ public class EnqueuedItem { @Override public final int hashCode() { - return Objects.hash(mailQueueName, mail, mailKey, enqueuedTime, partsId); + return Objects.hash(enQueueId, mailQueueName, mail, enqueuedTime, partsId); } } diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java index 7ca896d..9081acc 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java @@ -36,6 +36,7 @@ import org.apache.mailet.Mail; import com.fasterxml.jackson.core.JsonProcessingException; import com.github.fge.lambdas.Throwing; + import reactor.core.publisher.Mono; class Enqueuer { @@ -60,8 +61,9 @@ class Enqueuer { } void enQueue(Mail mail) throws MailQueue.MailQueueException { + EnQueueId enQueueId = EnQueueId.generate(); saveMail(mail) - .map(Throwing.<MimeMessagePartsId, EnqueuedItem>function(partsId -> publishReferenceToRabbit(mail, partsId)).sneakyThrow()) + .map(Throwing.<MimeMessagePartsId, EnqueuedItem>function(partsId -> publishReferenceToRabbit(enQueueId, mail, partsId)).sneakyThrow()) .flatMap(mailQueueView::storeMail) .thenEmpty(Mono.fromRunnable(enqueueMetric::increment)) .block(); @@ -75,10 +77,11 @@ class Enqueuer { } } - private EnqueuedItem publishReferenceToRabbit(Mail mail, MimeMessagePartsId partsId) throws MailQueue.MailQueueException { - rabbitClient.publish(name, getMailReferenceBytes(mail, partsId)); + private EnqueuedItem publishReferenceToRabbit(EnQueueId enQueueId, Mail mail, MimeMessagePartsId partsId) throws MailQueue.MailQueueException { + rabbitClient.publish(name, getMailReferenceBytes(enQueueId, mail, partsId)); return EnqueuedItem.builder() + .enQueueId(enQueueId) .mailQueueName(name) .mail(mail) .enqueuedTime(clock.instant()) @@ -86,9 +89,9 @@ class Enqueuer { .build(); } - private byte[] getMailReferenceBytes(Mail mail, MimeMessagePartsId partsId) throws MailQueue.MailQueueException { + private byte[] getMailReferenceBytes(EnQueueId enQueueId, Mail mail, MimeMessagePartsId partsId) throws MailQueue.MailQueueException { try { - MailReferenceDTO mailDTO = MailReferenceDTO.fromMail(mail, partsId); + MailReferenceDTO mailDTO = MailReferenceDTO.fromMail(enQueueId, mail, partsId); return mailReferenceSerializer.write(mailDTO); } catch (JsonProcessingException e) { throw new MailQueue.MailQueueException("Unable to serialize message", e); diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailReferenceDTO.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailReferenceDTO.java index a80bee1..f119325 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailReferenceDTO.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailReferenceDTO.java @@ -52,8 +52,9 @@ import com.google.common.collect.ImmutableMap; class MailReferenceDTO { - static MailReferenceDTO fromMail(Mail mail, MimeMessagePartsId partsId) { + static MailReferenceDTO fromMail(EnQueueId enQueueId, Mail mail, MimeMessagePartsId partsId) { return new MailReferenceDTO( + enQueueId.serialize(), Optional.ofNullable(mail.getRecipients()).map(Collection::stream) .orElse(Stream.empty()) .map(MailAddress::asString) @@ -89,6 +90,7 @@ class MailReferenceDTO { .collect(Guavate.toImmutableMap(name, value)); } + private final String enQueueId; private final ImmutableList<String> recipients; private final String name; private final Optional<String> sender; @@ -103,7 +105,8 @@ class MailReferenceDTO { private final String bodyBlobId; @JsonCreator - private MailReferenceDTO(@JsonProperty("recipients") ImmutableList<String> recipients, + private MailReferenceDTO(@JsonProperty("enQueueId") String enQueueId, + @JsonProperty("recipients") ImmutableList<String> recipients, @JsonProperty("name") String name, @JsonProperty("sender") Optional<String> sender, @JsonProperty("state") String state, @@ -112,9 +115,10 @@ class MailReferenceDTO { @JsonProperty("attributes") ImmutableMap<String, String> attributes, @JsonProperty("remoteAddr") String remoteAddr, @JsonProperty("remoteHost") String remoteHost, - @JsonProperty("perRecipientHeaders") Map<String, HeadersDto> perRecipientHeaders, + @JsonProperty("perRecipientHeaders") Map<String, HeadersDto> perRecipientHeaders, @JsonProperty("headerBlobId") String headerBlobId, @JsonProperty("bodyBlobId") String bodyBlobId) { + this.enQueueId = enQueueId; this.recipients = recipients; this.name = name; this.sender = sender; @@ -129,6 +133,11 @@ class MailReferenceDTO { this.bodyBlobId = bodyBlobId; } + @JsonProperty("enQueueId") + public String getEnQueueId() { + return enQueueId; + } + @JsonProperty("recipients") Collection<String> getRecipients() { return recipients; @@ -189,6 +198,10 @@ class MailReferenceDTO { return bodyBlobId; } + EnQueueId retrieveEnqueueId() { + return EnQueueId.ofSerialized(enQueueId); + } + MailImpl toMailWithMimeMessage(MimeMessage mimeMessage) throws MessagingException { MailImpl.Builder builder = MailImpl.builder() .name(name) @@ -233,7 +246,8 @@ class MailReferenceDTO { if (o instanceof MailReferenceDTO) { MailReferenceDTO mailDTO = (MailReferenceDTO) o; - return Objects.equals(this.recipients, mailDTO.recipients) + return Objects.equals(this.enQueueId, mailDTO.enQueueId) + && Objects.equals(this.recipients, mailDTO.recipients) && Objects.equals(this.name, mailDTO.name) && Objects.equals(this.sender, mailDTO.sender) && Objects.equals(this.state, mailDTO.state) @@ -251,6 +265,6 @@ class MailReferenceDTO { @Override public final int hashCode() { - return Objects.hash(recipients, name, sender, state, errorMessage, lastUpdated, attributes, remoteAddr, remoteHost, perRecipientHeaders, headerBlobId, bodyBlobId); + return Objects.hash(enQueueId, recipients, name, sender, state, errorMessage, lastUpdated, attributes, remoteAddr, remoteHost, perRecipientHeaders, headerBlobId, bodyBlobId); } } diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/DeleteCondition.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/DeleteCondition.java index 96060dd..4b640fa 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/DeleteCondition.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/DeleteCondition.java @@ -23,6 +23,7 @@ import java.util.Objects; import org.apache.commons.lang3.NotImplementedException; import org.apache.james.queue.api.ManageableMailQueue; +import org.apache.james.queue.rabbitmq.EnQueueId; import org.apache.mailet.Mail; import com.google.common.base.Preconditions; @@ -86,10 +87,6 @@ public interface DeleteCondition { this.name = name; } - public String getName() { - return name; - } - @Override public boolean shouldBeDeleted(Mail mail) { Preconditions.checkNotNull(mail); @@ -112,6 +109,24 @@ public interface DeleteCondition { } } + class WithEnqueueId implements DeleteCondition { + private final EnQueueId enQueueId; + + + public WithEnqueueId(EnQueueId enQueueId) { + this.enQueueId = enQueueId; + } + + public EnQueueId getEnQueueId() { + return enQueueId; + } + + @Override + public boolean shouldBeDeleted(Mail mail) { + throw new NotImplementedException("EnQueueId is not carried as a Mail property"); + } + } + class WithRecipient implements DeleteCondition { private final String recipientAsString; diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java index 93c79e4..a641484 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java @@ -20,9 +20,9 @@ package org.apache.james.queue.rabbitmq.view.api; import org.apache.james.queue.api.ManageableMailQueue; +import org.apache.james.queue.rabbitmq.EnQueueId; import org.apache.james.queue.rabbitmq.EnqueuedItem; import org.apache.james.queue.rabbitmq.MailQueueName; -import org.apache.mailet.Mail; import reactor.core.publisher.Mono; @@ -38,7 +38,7 @@ public interface MailQueueView { long delete(DeleteCondition deleteCondition); - Mono<Boolean> isPresent(Mail mail); + Mono<Boolean> isPresent(EnQueueId id); ManageableMailQueue.MailQueueIterator browse(); diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java index 9054e1b..baf19a3 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java @@ -44,6 +44,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -138,7 +139,7 @@ public class CassandraMailQueueBrowser { private Flux<EnqueuedItemWithSlicingContext> browseBucket(MailQueueName queueName, Slice slice, BucketId bucketId) { return enqueuedMailsDao.selectEnqueuedMails(queueName, slice, bucketId) - .filterWhen(mailReference -> deletedMailsDao.isStillEnqueued(queueName, mailReference.getEnqueuedItem().getMailKey())); + .filterWhen(mailReference -> deletedMailsDao.isStillEnqueued(queueName, mailReference.getEnqueuedItem().getEnQueueId())); } private Flux<Slice> allSlicesStartingAt(Instant browseStart) { diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java index 168cc6c..55d563a 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java @@ -24,10 +24,9 @@ import java.util.concurrent.ThreadLocalRandom; import javax.inject.Inject; +import org.apache.james.queue.rabbitmq.EnQueueId; import org.apache.james.queue.rabbitmq.MailQueueName; import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration; -import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey; -import org.apache.mailet.Mail; import reactor.core.publisher.Mono; @@ -49,18 +48,14 @@ public class CassandraMailQueueMailDelete { this.configuration = configuration; } - Mono<Void> considerDeleted(Mail mail, MailQueueName mailQueueName) { - return considerDeleted(MailKey.fromMail(mail), mailQueueName); - } - - Mono<Void> considerDeleted(MailKey mailKey, MailQueueName mailQueueName) { + Mono<Void> considerDeleted(EnQueueId enQueueId, MailQueueName mailQueueName) { return deletedMailsDao - .markAsDeleted(mailQueueName, mailKey) + .markAsDeleted(mailQueueName, enQueueId) .doOnNext(ignored -> maybeUpdateBrowseStart(mailQueueName)); } - Mono<Boolean> isDeleted(Mail mail, MailQueueName mailQueueName) { - return deletedMailsDao.isDeleted(mailQueueName, MailKey.fromMail(mail)); + Mono<Boolean> isDeleted(EnQueueId enQueueId, MailQueueName mailQueueName) { + return deletedMailsDao.isDeleted(mailQueueName, enQueueId); } void updateBrowseStart(MailQueueName mailQueueName) { diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java index 85ae197..15fd0fc 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java @@ -22,6 +22,7 @@ package org.apache.james.queue.rabbitmq.view.cassandra; import javax.inject.Inject; import org.apache.james.queue.api.ManageableMailQueue; +import org.apache.james.queue.rabbitmq.EnQueueId; import org.apache.james.queue.rabbitmq.EnqueuedItem; import org.apache.james.queue.rabbitmq.MailQueueName; import org.apache.james.queue.rabbitmq.view.api.DeleteCondition; @@ -29,8 +30,6 @@ import org.apache.james.queue.rabbitmq.view.api.MailQueueView; import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration; import org.apache.james.queue.rabbitmq.view.cassandra.configuration.EventsourcingConfigurationManagement; import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext; -import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey; -import org.apache.mailet.Mail; import reactor.core.publisher.Mono; @@ -101,9 +100,9 @@ public class CassandraMailQueueView implements MailQueueView { @Override public long delete(DeleteCondition deleteCondition) { - if (deleteCondition instanceof DeleteCondition.WithName) { - DeleteCondition.WithName nameDeleteCondition = (DeleteCondition.WithName) deleteCondition; - delete(MailKey.of(nameDeleteCondition.getName())).block(); + if (deleteCondition instanceof DeleteCondition.WithEnqueueId) { + DeleteCondition.WithEnqueueId enQueueIdCondition = (DeleteCondition.WithEnqueueId) deleteCondition; + delete(enQueueIdCondition.getEnQueueId()).block(); return 1L; } return browseThenDelete(deleteCondition); @@ -113,19 +112,19 @@ public class CassandraMailQueueView implements MailQueueView { return cassandraMailQueueBrowser.browseReferences(mailQueueName) .map(EnqueuedItemWithSlicingContext::getEnqueuedItem) .filter(mailReference -> deleteCondition.shouldBeDeleted(mailReference.getMail())) - .flatMap(mailReference -> cassandraMailQueueMailDelete.considerDeleted(mailReference.getMail(), mailQueueName)) + .flatMap(mailReference -> cassandraMailQueueMailDelete.considerDeleted(mailReference.getEnQueueId(), mailQueueName)) .count() .doOnNext(ignored -> cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName)) .block(); } - private Mono<Void> delete(MailKey mailKey) { - return cassandraMailQueueMailDelete.considerDeleted(mailKey, mailQueueName); + private Mono<Void> delete(EnQueueId enQueueId) { + return cassandraMailQueueMailDelete.considerDeleted(enQueueId, mailQueueName); } @Override - public Mono<Boolean> isPresent(Mail mail) { - return cassandraMailQueueMailDelete.isDeleted(mail, mailQueueName) + public Mono<Boolean> isPresent(EnQueueId id) { + return cassandraMailQueueMailDelete.isDeleted(id, mailQueueName) .map(bool -> !bool); } } diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewModule.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewModule.java index 9068f61..a160b9b 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewModule.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewModule.java @@ -25,6 +25,7 @@ import static com.datastax.driver.core.DataType.list; import static com.datastax.driver.core.DataType.map; import static com.datastax.driver.core.DataType.text; import static com.datastax.driver.core.DataType.timestamp; +import static com.datastax.driver.core.DataType.uuid; import static com.datastax.driver.core.schemabuilder.SchemaBuilder.frozen; import org.apache.james.backends.cassandra.components.CassandraModule; @@ -32,14 +33,15 @@ import org.apache.james.backends.cassandra.components.CassandraModule; public interface CassandraMailQueueViewModule { interface EnqueuedMailsTable { - String TABLE_NAME = "enqueuedMails"; + String TABLE_NAME = "enqueuedMailsV2"; String QUEUE_NAME = "queueName"; String TIME_RANGE_START = "timeRangeStart"; String BUCKET_ID = "bucketId"; String ENQUEUED_TIME = "enqueuedTime"; - String MAIL_KEY = "mailKey"; + String ENQUEUE_ID = "enqueueId"; + String NAME = "name"; String HEADER_BLOB_ID = "headerBlobId"; String BODY_BLOB_ID = "bodyBlobId"; String STATE = "state"; @@ -65,10 +67,10 @@ public interface CassandraMailQueueViewModule { } interface DeletedMailTable { - String TABLE_NAME = "deletedMails"; + String TABLE_NAME = "deletedMailsV2"; String QUEUE_NAME = "queueName"; - String MAIL_KEY = "mailKey"; + String ENQUEUE_ID = "enqueueId"; } CassandraModule MODULE = CassandraModule @@ -86,8 +88,9 @@ public interface CassandraMailQueueViewModule { .addPartitionKey(EnqueuedMailsTable.QUEUE_NAME, text()) .addPartitionKey(EnqueuedMailsTable.TIME_RANGE_START, timestamp()) .addPartitionKey(EnqueuedMailsTable.BUCKET_ID, cint()) - .addClusteringColumn(EnqueuedMailsTable.MAIL_KEY, text()) + .addClusteringColumn(EnqueuedMailsTable.ENQUEUE_ID, uuid()) .addColumn(EnqueuedMailsTable.ENQUEUED_TIME, timestamp()) + .addColumn(EnqueuedMailsTable.NAME, text()) .addColumn(EnqueuedMailsTable.STATE, text()) .addColumn(EnqueuedMailsTable.HEADER_BLOB_ID, text()) .addColumn(EnqueuedMailsTable.BODY_BLOB_ID, text()) @@ -115,7 +118,7 @@ public interface CassandraMailQueueViewModule { .options(options -> options) .statement(statement -> statement .addPartitionKey(DeletedMailTable.QUEUE_NAME, text()) - .addPartitionKey(DeletedMailTable.MAIL_KEY, text())) + .addPartitionKey(DeletedMailTable.ENQUEUE_ID, uuid())) .build(); } diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java index eb64990..7f44c6f 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAO.java @@ -23,18 +23,19 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; -import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.DeletedMailTable.MAIL_KEY; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.DeletedMailTable.ENQUEUE_ID; import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.DeletedMailTable.QUEUE_NAME; import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.DeletedMailTable.TABLE_NAME; import javax.inject.Inject; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; +import org.apache.james.queue.rabbitmq.EnQueueId; import org.apache.james.queue.rabbitmq.MailQueueName; -import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Session; + import reactor.core.publisher.Mono; public class DeletedMailsDAO { @@ -53,31 +54,31 @@ public class DeletedMailsDAO { private PreparedStatement prepareInsert(Session session) { return session.prepare(insertInto(TABLE_NAME) .value(QUEUE_NAME, bindMarker(QUEUE_NAME)) - .value(MAIL_KEY, bindMarker(MAIL_KEY))); + .value(ENQUEUE_ID, bindMarker(ENQUEUE_ID))); } private PreparedStatement prepareSelectExist(Session session) { return session.prepare(select() .from(TABLE_NAME) .where(eq(QUEUE_NAME, bindMarker(QUEUE_NAME))) - .and(eq(MAIL_KEY, bindMarker(MAIL_KEY)))); + .and(eq(ENQUEUE_ID, bindMarker(ENQUEUE_ID)))); } - Mono<Void> markAsDeleted(MailQueueName mailQueueName, MailKey mailKey) { + Mono<Void> markAsDeleted(MailQueueName mailQueueName, EnQueueId enQueueId) { return executor.executeVoid(insertOne.bind() .setString(QUEUE_NAME, mailQueueName.asString()) - .setString(MAIL_KEY, mailKey.getMailKey())); + .setUUID(ENQUEUE_ID, enQueueId.asUUID())); } - Mono<Boolean> isDeleted(MailQueueName mailQueueName, MailKey mailKey) { + Mono<Boolean> isDeleted(MailQueueName mailQueueName, EnQueueId enQueueId) { return executor.executeReturnExists( selectOne.bind() .setString(QUEUE_NAME, mailQueueName.asString()) - .setString(MAIL_KEY, mailKey.getMailKey())); + .setUUID(ENQUEUE_ID, enQueueId.asUUID())); } - Mono<Boolean> isStillEnqueued(MailQueueName mailQueueName, MailKey mailKey) { - return isDeleted(mailQueueName, mailKey) + Mono<Boolean> isStillEnqueued(MailQueueName mailQueueName, EnQueueId enQueueId) { + return isDeleted(mailQueueName, enQueueId) .map(b -> !b); } } diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java index dde1aa6..1c8627e 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java @@ -27,10 +27,11 @@ import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueV import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.BODY_BLOB_ID; import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.BUCKET_ID; import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.ENQUEUED_TIME; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.ENQUEUE_ID; import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.ERROR_MESSAGE; import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.HEADER_BLOB_ID; import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.LAST_UPDATED; -import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.MAIL_KEY; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.NAME; import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.PER_RECIPIENT_SPECIFIC_HEADERS; import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.QUEUE_NAME; import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.RECIPIENTS; @@ -97,7 +98,8 @@ public class EnqueuedMailsDAO { .value(QUEUE_NAME, bindMarker(QUEUE_NAME)) .value(TIME_RANGE_START, bindMarker(TIME_RANGE_START)) .value(BUCKET_ID, bindMarker(BUCKET_ID)) - .value(MAIL_KEY, bindMarker(MAIL_KEY)) + .value(ENQUEUE_ID, bindMarker(ENQUEUE_ID)) + .value(NAME, bindMarker(NAME)) .value(HEADER_BLOB_ID, bindMarker(HEADER_BLOB_ID)) .value(BODY_BLOB_ID, bindMarker(BODY_BLOB_ID)) .value(ENQUEUED_TIME, bindMarker(ENQUEUED_TIME)) @@ -123,7 +125,8 @@ public class EnqueuedMailsDAO { .setTimestamp(TIME_RANGE_START, Date.from(slicingContext.getTimeRangeStart())) .setInt(BUCKET_ID, slicingContext.getBucketId().getValue()) .setTimestamp(ENQUEUED_TIME, Date.from(enqueuedItem.getEnqueuedTime())) - .setString(MAIL_KEY, mail.getName()) + .setUUID(ENQUEUE_ID, enqueuedItem.getEnQueueId().asUUID()) + .setString(NAME, mail.getName()) .setString(HEADER_BLOB_ID, mimeMessagePartsId.getHeaderBlobId().asString()) .setString(BODY_BLOB_ID, mimeMessagePartsId.getBodyBlobId().asString()) .setString(STATE, mail.getState()) diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoUtil.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoUtil.java index a4e7eaa..004e9af 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoUtil.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoUtil.java @@ -23,13 +23,14 @@ import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueV import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.BODY_BLOB_ID; import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.BUCKET_ID; import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.ENQUEUED_TIME; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.ENQUEUE_ID; import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.ERROR_MESSAGE; import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.HEADER_BLOB_ID; import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.HEADER_NAME; import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.HEADER_TYPE; import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.HEADER_VALUE; import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.LAST_UPDATED; -import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.MAIL_KEY; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.NAME; import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.PER_RECIPIENT_SPECIFIC_HEADERS; import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.QUEUE_NAME; import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.RECIPIENTS; @@ -57,6 +58,7 @@ import org.apache.james.backends.cassandra.init.CassandraTypesProvider; import org.apache.james.blob.api.BlobId; import org.apache.james.blob.mail.MimeMessagePartsId; import org.apache.james.core.MailAddress; +import org.apache.james.queue.rabbitmq.EnQueueId; import org.apache.james.queue.rabbitmq.EnqueuedItem; import org.apache.james.queue.rabbitmq.MailQueueName; import org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices; @@ -79,6 +81,7 @@ public class EnqueuedMailsDaoUtil { static EnqueuedItemWithSlicingContext toEnqueuedMail(Row row, BlobId.Factory blobFactory) { MailQueueName queueName = MailQueueName.fromString(row.getString(QUEUE_NAME)); + EnQueueId enQueueId = EnQueueId.of(row.getUUID(ENQUEUE_ID)); Instant timeRangeStart = row.getTimestamp(TIME_RANGE_START).toInstant(); BucketedSlices.BucketId bucketId = BucketedSlices.BucketId.of(row.getInt(BUCKET_ID)); Instant enqueuedTime = row.getTimestamp(ENQUEUED_TIME).toInstant(); @@ -101,7 +104,7 @@ public class EnqueuedMailsDaoUtil { String remoteAddr = row.getString(REMOTE_ADDR); String remoteHost = row.getString(REMOTE_HOST); String errorMessage = row.getString(ERROR_MESSAGE); - String name = row.getString(MAIL_KEY); + String name = row.getString(NAME); Date lastUpdated = row.getTimestamp(LAST_UPDATED); Map<String, ByteBuffer> rawAttributes = row.getMap(ATTRIBUTES, String.class, ByteBuffer.class); PerRecipientHeaders perRecipientHeaders = fromHeaderMap(row.getMap(PER_RECIPIENT_SPECIFIC_HEADERS, String.class, UDTValue.class)); @@ -119,6 +122,7 @@ public class EnqueuedMailsDaoUtil { .addAttributes(toAttributes(rawAttributes)) .build(); EnqueuedItem enqueuedItem = EnqueuedItem.builder() + .enQueueId(enQueueId) .mailQueueName(queueName) .mail(mail) .enqueuedTime(enqueuedTime) diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/EnqueuedItemTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/EnqueuedItemTest.java index a3a205a..73c8302 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/EnqueuedItemTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/EnqueuedItemTest.java @@ -22,6 +22,7 @@ package org.apache.james.queue.rabbitmq; import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.time.Instant; + import javax.mail.MessagingException; import org.apache.james.blob.api.HashBlobId; @@ -56,8 +57,21 @@ class EnqueuedItemTest { } @Test + void buildShouldThrowWhenEnqueueIdIsNull() { + assertThatThrownBy(() -> EnqueuedItem.builder() + .enQueueId(null) + .mailQueueName(mailQueueName) + .mail(mail) + .enqueuedTime(enqueuedTime) + .mimeMessagePartsId(partsId) + .build()) + .isInstanceOf(NullPointerException.class); + } + + @Test void buildShouldThrowWhenMailQueueNameIsNull() { assertThatThrownBy(() -> EnqueuedItem.builder() + .enQueueId(EnQueueId.generate()) .mailQueueName(null) .mail(mail) .enqueuedTime(enqueuedTime) @@ -69,6 +83,7 @@ class EnqueuedItemTest { @Test void buildShouldThrowWhenMailIsNull() { assertThatThrownBy(() -> EnqueuedItem.builder() + .enQueueId(EnQueueId.generate()) .mailQueueName(mailQueueName) .mail(null) .enqueuedTime(enqueuedTime) @@ -80,6 +95,7 @@ class EnqueuedItemTest { @Test void buildShouldThrowWhenEnqueuedTimeIsNull() { assertThatThrownBy(() -> EnqueuedItem.builder() + .enQueueId(EnQueueId.generate()) .mailQueueName(mailQueueName) .mail(mail) .enqueuedTime(null) @@ -91,6 +107,7 @@ class EnqueuedItemTest { @Test void buildShouldThrowWhenMimeMessagePartsIdIsNull() { assertThatThrownBy(() -> EnqueuedItem.builder() + .enQueueId(EnQueueId.generate()) .mailQueueName(mailQueueName) .mail(mail) .enqueuedTime(enqueuedTime) diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/MailDTOTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/MailDTOTest.java index 712bcc5..2c98b1d 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/MailDTOTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/MailDTOTest.java @@ -46,6 +46,7 @@ import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; class MailDTOTest { + static final EnQueueId EN_QUEUE_ID = EnQueueId.ofSerialized("110e8400-e29b-11d4-a716-446655440000"); static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory(); static final Date LAST_UPDATED = Date.from(Instant.parse("2016-09-08T14:25:52.000Z")); @@ -85,6 +86,7 @@ class MailDTOTest { private MailReferenceDTO mailDTO1() throws MessagingException { return MailReferenceDTO.fromMail( + EN_QUEUE_ID, FakeMail.builder() .name("mail-name-558") .recipients(MailAddressFixture.RECIPIENT1, MailAddressFixture.RECIPIENT2) @@ -117,6 +119,7 @@ class MailDTOTest { mail.setState(null); mail.setLastUpdated(null); return MailReferenceDTO.fromMail( + EN_QUEUE_ID, mail, MimeMessagePartsId.builder() .headerBlobId(BLOB_ID_FACTORY.from("210e7136-ede3-44eb-9495-3ed816d6e23b")) diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index 03455ff..f6d57c6 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -231,13 +231,6 @@ public class RabbitMQMailQueueTest implements ManageableMailQueueContract, MailQ } - @Test - @Override - @Disabled("JAMES-2794 This test never finishes") - public void enQueueShouldAcceptMailWithDuplicatedNames() { - - } - private void enqueueSomeMails(Function<Integer, String> namePattern, int emailCount) { IntStream.rangeClosed(1, emailCount) .forEach(Throwing.intConsumer(i -> enQueue(defaultMail() diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java index e21804f..73f0f27 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java @@ -23,8 +23,10 @@ import static org.assertj.core.api.Assertions.assertThat; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraClusterExtension; +import org.apache.james.backends.cassandra.components.CassandraModule; +import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule; +import org.apache.james.queue.rabbitmq.EnQueueId; import org.apache.james.queue.rabbitmq.MailQueueName; -import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -33,11 +35,14 @@ class DeletedMailsDAOTest { private static final MailQueueName OUT_GOING_1 = MailQueueName.fromString("OUT_GOING_1"); private static final MailQueueName OUT_GOING_2 = MailQueueName.fromString("OUT_GOING_2"); - private static final MailKey MAIL_KEY_1 = MailKey.of("mailkey1"); - private static final MailKey MAIL_KEY_2 = MailKey.of("mailkey2"); + private static final EnQueueId EN_QUEUE_ID_1 = EnQueueId.ofSerialized("110e8400-e29b-11d4-a716-446655440000"); + private static final EnQueueId EN_QUEUE_ID_2 = EnQueueId.ofSerialized("464765a0-e4e7-11e4-aba4-710c1de3782b"); @RegisterExtension - static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraMailQueueViewModule.MODULE); + static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension( + CassandraModule.aggregateModules( + CassandraSchemaVersionModule.MODULE, + CassandraMailQueueViewModule.MODULE)); private DeletedMailsDAO testee; @@ -49,14 +54,14 @@ class DeletedMailsDAOTest { @Test void markAsDeletedShouldWork() { Boolean isDeletedBeforeMark = testee - .isDeleted(OUT_GOING_1, MAIL_KEY_1) + .isDeleted(OUT_GOING_1, EN_QUEUE_ID_1) .block(); assertThat(isDeletedBeforeMark).isFalse(); - testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_1).block(); + testee.markAsDeleted(OUT_GOING_1, EN_QUEUE_ID_1).block(); Boolean isDeletedAfterMark = testee - .isDeleted(OUT_GOING_1, MAIL_KEY_1) + .isDeleted(OUT_GOING_1, EN_QUEUE_ID_1) .block(); assertThat(isDeletedAfterMark).isTrue(); @@ -64,10 +69,10 @@ class DeletedMailsDAOTest { @Test void checkDeletedShouldReturnFalseWhenTableDoesntContainBothMailQueueAndMailKey() { - testee.markAsDeleted(OUT_GOING_2, MAIL_KEY_2).block(); + testee.markAsDeleted(OUT_GOING_2, EN_QUEUE_ID_2).block(); Boolean isDeleted = testee - .isDeleted(OUT_GOING_1, MAIL_KEY_1) + .isDeleted(OUT_GOING_1, EN_QUEUE_ID_1) .block(); assertThat(isDeleted).isFalse(); @@ -75,10 +80,10 @@ class DeletedMailsDAOTest { @Test void checkDeletedShouldReturnFalseWhenTableContainsMailQueueButNotMailKey() { - testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_2).block(); + testee.markAsDeleted(OUT_GOING_1, EN_QUEUE_ID_2).block(); Boolean isDeleted = testee - .isDeleted(OUT_GOING_1, MAIL_KEY_1) + .isDeleted(OUT_GOING_1, EN_QUEUE_ID_1) .block(); assertThat(isDeleted).isFalse(); @@ -86,10 +91,10 @@ class DeletedMailsDAOTest { @Test void checkDeletedShouldReturnFalseWhenTableContainsMailKeyButNotMailQueue() { - testee.markAsDeleted(OUT_GOING_2, MAIL_KEY_1).block(); + testee.markAsDeleted(OUT_GOING_2, EN_QUEUE_ID_1).block(); Boolean isDeleted = testee - .isDeleted(OUT_GOING_1, MAIL_KEY_1) + .isDeleted(OUT_GOING_1, EN_QUEUE_ID_1) .block(); assertThat(isDeleted).isFalse(); @@ -97,10 +102,10 @@ class DeletedMailsDAOTest { @Test void checkDeletedShouldReturnTrueWhenTableContainsMailItem() { - testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_1).block(); + testee.markAsDeleted(OUT_GOING_1, EN_QUEUE_ID_1).block(); Boolean isDeleted = testee - .isDeleted(OUT_GOING_1, MAIL_KEY_1) + .isDeleted(OUT_GOING_1, EN_QUEUE_ID_1) .block(); assertThat(isDeleted).isTrue(); diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java index d855d80..7dce6e5 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java @@ -30,12 +30,12 @@ import org.apache.james.backends.cassandra.CassandraClusterExtension; import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.HashBlobId; import org.apache.james.blob.mail.MimeMessagePartsId; +import org.apache.james.queue.rabbitmq.EnQueueId; import org.apache.james.queue.rabbitmq.EnqueuedItem; import org.apache.james.queue.rabbitmq.MailQueueName; import org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.BucketId; import org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice; import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext; -import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey; import org.apache.mailet.base.test.FakeMail; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -44,7 +44,8 @@ import org.junit.jupiter.api.extension.RegisterExtension; class EnqueuedMailsDaoTest { private static final MailQueueName OUT_GOING_1 = MailQueueName.fromString("OUT_GOING_1"); - private static final MailKey MAIL_KEY_1 = MailKey.of("mailkey1"); + private static final EnQueueId EN_QUEUE_ID = EnQueueId.ofSerialized("110e8400-e29b-11d4-a716-446655440000"); + private static final String NAME = "name"; private static int BUCKET_ID_VALUE = 10; private static final BucketId BUCKET_ID = BucketId.of(BUCKET_ID_VALUE); private static final Instant NOW = Instant.now(); @@ -75,9 +76,10 @@ class EnqueuedMailsDaoTest { void insertShouldWork() throws Exception { testee.insert(EnqueuedItemWithSlicingContext.builder() .enqueuedItem(EnqueuedItem.builder() + .enQueueId(EN_QUEUE_ID) .mailQueueName(OUT_GOING_1) .mail(FakeMail.builder() - .name(MAIL_KEY_1.getMailKey()) + .name(NAME) .build()) .enqueuedTime(NOW) .mimeMessagePartsId(MIME_MESSAGE_PARTS_ID) @@ -97,9 +99,10 @@ class EnqueuedMailsDaoTest { void selectEnqueuedMailsShouldWork() throws Exception { testee.insert(EnqueuedItemWithSlicingContext.builder() .enqueuedItem(EnqueuedItem.builder() + .enQueueId(EN_QUEUE_ID) .mailQueueName(OUT_GOING_1) .mail(FakeMail.builder() - .name(MAIL_KEY_1.getMailKey()) + .name(NAME) .build()) .enqueuedTime(NOW) .mimeMessagePartsId(MIME_MESSAGE_PARTS_ID) @@ -110,9 +113,10 @@ class EnqueuedMailsDaoTest { testee.insert(EnqueuedItemWithSlicingContext.builder() .enqueuedItem(EnqueuedItem.builder() + .enQueueId(EN_QUEUE_ID) .mailQueueName(OUT_GOING_1) .mail(FakeMail.builder() - .name(MAIL_KEY_1.getMailKey()) + .name(NAME) .build()) .enqueuedTime(NOW) .mimeMessagePartsId(MIME_MESSAGE_PARTS_ID) @@ -134,7 +138,8 @@ class EnqueuedMailsDaoTest { softly.assertThat(slicingContext.getTimeRangeStart()).isEqualTo(NOW); softly.assertThat(enqueuedItem.getMailQueueName()).isEqualTo(OUT_GOING_1); softly.assertThat(enqueuedItem.getEnqueuedTime()).isEqualTo(NOW); - softly.assertThat(enqueuedItem.getMailKey()).isEqualTo(MAIL_KEY_1); + softly.assertThat(enqueuedItem.getEnQueueId()).isEqualTo(EN_QUEUE_ID); + softly.assertThat(enqueuedItem.getMail().getName()).isEqualTo(NAME); softly.assertThat(enqueuedItem.getPartsId()).isEqualTo(MIME_MESSAGE_PARTS_ID); }); }); diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedItemWithSlicingContextTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedItemWithSlicingContextTest.java index 5eb63a1..f1c2729 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedItemWithSlicingContextTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedItemWithSlicingContextTest.java @@ -22,10 +22,12 @@ package org.apache.james.queue.rabbitmq.view.cassandra.model; import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.time.Instant; + import javax.mail.MessagingException; import org.apache.james.blob.api.HashBlobId; import org.apache.james.blob.mail.MimeMessagePartsId; +import org.apache.james.queue.rabbitmq.EnQueueId; import org.apache.james.queue.rabbitmq.EnqueuedItem; import org.apache.james.queue.rabbitmq.MailQueueName; import org.apache.mailet.base.test.FakeMail; @@ -40,7 +42,8 @@ class EnqueuedItemWithSlicingContextTest { private EnqueuedItemWithSlicingContextTest() throws MessagingException { enqueuedItem = EnqueuedItem.builder() - .mailQueueName(MailQueueName.fromString("mailQueueName")) + .enQueueId(EnQueueId.generate()) + .mailQueueName(MailQueueName.fromString("mailQueueName")) .mail(FakeMail.builder() .name("name") .build()) diff --git a/server/queue/queue-rabbitmq/src/test/resources/json/mail1.json b/server/queue/queue-rabbitmq/src/test/resources/json/mail1.json index b1d6d56..9bfbee0 100644 --- a/server/queue/queue-rabbitmq/src/test/resources/json/mail1.json +++ b/server/queue/queue-rabbitmq/src/test/resources/json/mail1.json @@ -1,4 +1,5 @@ { + "enQueueId": "110e8400-e29b-11d4-a716-446655440000", "recipients": [ "recipient1@localhost", "recipient2@localhost" diff --git a/server/queue/queue-rabbitmq/src/test/resources/json/mail_min.json b/server/queue/queue-rabbitmq/src/test/resources/json/mail_min.json index b2b8eea..21dcd7c 100644 --- a/server/queue/queue-rabbitmq/src/test/resources/json/mail_min.json +++ b/server/queue/queue-rabbitmq/src/test/resources/json/mail_min.json @@ -1,4 +1,5 @@ { + "enQueueId": "110e8400-e29b-11d4-a716-446655440000", "recipients":[], "name":"mail-name-558", "sender":null, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
