http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoUtil.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoUtil.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoUtil.java new file mode 100644 index 0000000..6be8597 --- /dev/null +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoUtil.java @@ -0,0 +1,196 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.queue.rabbitmq.view.cassandra; + +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.ATTRIBUTES; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.BUCKET_ID; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.ENQUEUED_TIME; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.ERROR_MESSAGE; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.HEADER_NAME; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.HEADER_TYPE; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.HEADER_VALUE; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.LAST_UPDATED; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.MAIL_KEY; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.PER_RECIPIENT_SPECIFIC_HEADERS; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.QUEUE_NAME; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.RECIPIENTS; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.REMOTE_ADDR; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.REMOTE_HOST; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.SENDER; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.STATE; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.TIME_RANGE_START; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import javax.mail.internet.AddressException; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.james.backends.cassandra.init.CassandraTypesProvider; +import org.apache.james.core.MailAddress; +import org.apache.james.queue.rabbitmq.MailQueueName; +import org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices; +import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedMail; +import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey; +import org.apache.james.server.core.MailImpl; +import org.apache.james.util.streams.Iterators; +import org.apache.mailet.Mail; +import org.apache.mailet.PerRecipientHeaders; + +import com.datastax.driver.core.Row; +import com.datastax.driver.core.UDTValue; +import com.github.fge.lambdas.Throwing; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +public class EnqueuedMailsDaoUtil { + + static EnqueuedMail toEnqueuedMail(Row row) { + MailQueueName queueName = MailQueueName.fromString(row.getString(QUEUE_NAME)); + Instant timeRangeStart = row.getTimestamp(TIME_RANGE_START).toInstant(); + BucketedSlices.BucketId bucketId = BucketedSlices.BucketId.of(row.getInt(BUCKET_ID)); + Instant enqueuedTime = row.getTimestamp(ENQUEUED_TIME).toInstant(); + + MailAddress sender = Optional.ofNullable(row.getString(SENDER)) + .map(Throwing.function(MailAddress::new)) + .orElse(null); + List<MailAddress> recipients = row.getList(RECIPIENTS, String.class) + .stream() + .map(Throwing.function(MailAddress::new)) + .collect(ImmutableList.toImmutableList()); + String state = row.getString(STATE); + String remoteAddr = row.getString(REMOTE_ADDR); + String remoteHost = row.getString(REMOTE_HOST); + String errorMessage = row.getString(ERROR_MESSAGE); + String name = row.getString(MAIL_KEY); + Date lastUpdated = row.getTimestamp(LAST_UPDATED); + Map<String, ByteBuffer> rawAttributes = row.getMap(ATTRIBUTES, String.class, ByteBuffer.class); + PerRecipientHeaders perRecipientHeaders = fromHeaderMap(row.getMap(PER_RECIPIENT_SPECIFIC_HEADERS, String.class, UDTValue.class)); + + MailImpl mail = MailImpl.builder() + .name(name) + .sender(sender) + .recipients(recipients) + .lastUpdated(lastUpdated) + .errorMessage(errorMessage) + .remoteHost(remoteHost) + .remoteAddr(remoteAddr) + .state(state) + .addAllHeadersForRecipients(perRecipientHeaders) + .attributes(toAttributes(rawAttributes)) + .build(); + + return EnqueuedMail.builder() + .mail(mail) + .bucketId(bucketId) + .timeRangeStart(timeRangeStart) + .enqueuedTime(enqueuedTime) + .mailKey(MailKey.of(name)) + .mailQueueName(queueName) + .build(); + } + + private static Map<String, Serializable> toAttributes(Map<String, ByteBuffer> rowAttributes) { + return rowAttributes.entrySet() + .stream() + .collect(ImmutableMap.toImmutableMap( + Map.Entry::getKey, + entry -> fromByteBuffer(entry.getValue()))); + } + + private static Serializable fromByteBuffer(ByteBuffer byteBuffer) { + try { + byte[] data = new byte[byteBuffer.remaining()]; + byteBuffer.get(data); + ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(data)); + return (Serializable) objectInputStream.readObject(); + } catch (IOException | ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + + private static PerRecipientHeaders fromHeaderMap(Map<String, UDTValue> rawMap) { + PerRecipientHeaders result = new PerRecipientHeaders(); + + rawMap.forEach((key, value) -> result.addHeaderForRecipient(PerRecipientHeaders.Header.builder() + .name(value.getString(HEADER_NAME)) + .value(value.getString(HEADER_VALUE)) + .build(), + toMailAddress(key))); + return result; + } + + private static MailAddress toMailAddress(String rawValue) { + try { + return new MailAddress(rawValue); + } catch (AddressException e) { + throw new RuntimeException(e); + } + } + + static ImmutableList<String> asStringList(Collection<MailAddress> mailAddresses) { + return mailAddresses.stream() + .map(MailAddress::asString) + .collect(ImmutableList.toImmutableList()); + } + + static ImmutableMap<String, ByteBuffer> toRawAttributeMap(Mail mail) { + return Iterators.toStream(mail.getAttributeNames()) + .map(name -> Pair.of(name, mail.getAttribute(name))) + .map(pair -> Pair.of(pair.getLeft(), toByteBuffer(pair.getRight()))) + .collect(ImmutableMap.toImmutableMap(Pair::getLeft, Pair::getRight)); + } + + private static ByteBuffer toByteBuffer(Serializable serializable) { + try { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + new ObjectOutputStream(outputStream).writeObject(serializable); + return ByteBuffer.wrap(outputStream.toByteArray()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + static ImmutableMap<String, UDTValue> toHeaderMap(CassandraTypesProvider cassandraTypesProvider, + PerRecipientHeaders perRecipientHeaders) { + return perRecipientHeaders.getHeadersByRecipient() + .asMap() + .entrySet() + .stream() + .flatMap(entry -> entry.getValue().stream().map(value -> Pair.of(entry.getKey(), value))) + .map(entry -> Pair.of(entry.getKey().asString(), + cassandraTypesProvider.getDefinedUserType(HEADER_TYPE) + .newValue() + .setString(HEADER_NAME, entry.getRight().getName()) + .setString(HEADER_VALUE, entry.getRight().getValue()))) + .collect(ImmutableMap.toImmutableMap(Pair::getLeft, Pair::getRight)); + } +}
http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java index f7d5ac1..1f5d3d5 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java @@ -19,6 +19,7 @@ package org.apache.james.queue.rabbitmq.view.cassandra.model; +import java.time.Duration; import java.time.Instant; import java.util.Objects; import java.util.stream.LongStream; @@ -37,7 +38,7 @@ public class BucketedSlices { private final int value; private BucketId(int value) { - Preconditions.checkArgument(value >= 0, "sliceWindowSizeInSecond should not be negative"); + Preconditions.checkArgument(value >= 0, "bucketId should not be negative"); this.value = value; } @@ -64,19 +65,19 @@ public class BucketedSlices { public static class Slice { - public static Slice of(Instant sliceStartInstant, long sliceWindowSizeInSecond) { - return new Slice(sliceStartInstant, sliceWindowSizeInSecond); + public static Slice of(Instant sliceStartInstant, Duration sliceWindowSize) { + return new Slice(sliceStartInstant, sliceWindowSize); } public static Stream<Slice> allSlicesTill(Slice firstSlice, Instant endAt) { long sliceCount = calculateSliceCount(firstSlice, endAt); long startAtSeconds = firstSlice.getStartSliceInstant().getEpochSecond(); - long sliceWindowSizeInSecond = firstSlice.getSliceWindowSizeInSecond(); + long sliceWindowSizeInSecond = firstSlice.getSliceWindowSize().getSeconds(); return LongStream.range(0, sliceCount) .map(slicePosition -> startAtSeconds + sliceWindowSizeInSecond * slicePosition) .mapToObj(Instant::ofEpochSecond) - .map(sliceStartInstant -> Slice.of(sliceStartInstant, firstSlice.getSliceWindowSizeInSecond())); + .map(sliceStartInstant -> Slice.of(sliceStartInstant, firstSlice.getSliceWindowSize())); } private static long calculateSliceCount(Slice firstSlice, Instant endAt) { @@ -87,27 +88,27 @@ public class BucketedSlices { if (timeDiffInSecond < 0) { return 0; } else { - return (timeDiffInSecond / firstSlice.sliceWindowSizeInSecond) + 1; + return (timeDiffInSecond / firstSlice.getSliceWindowSize().getSeconds()) + 1; } } private final Instant startSliceInstant; - private final long sliceWindowSizeInSecond; + private final Duration sliceWindowSize; - private Slice(Instant startSliceInstant, long sliceWindowSizeInSecond) { + private Slice(Instant startSliceInstant, Duration sliceWindowSize) { Preconditions.checkNotNull(startSliceInstant); - Preconditions.checkArgument(sliceWindowSizeInSecond > 0, "sliceWindowSizeInSecond should be positive"); + Preconditions.checkNotNull(sliceWindowSize); this.startSliceInstant = startSliceInstant; - this.sliceWindowSizeInSecond = sliceWindowSizeInSecond; + this.sliceWindowSize = sliceWindowSize; } public Instant getStartSliceInstant() { return startSliceInstant; } - public long getSliceWindowSizeInSecond() { - return sliceWindowSizeInSecond; + public Duration getSliceWindowSize() { + return sliceWindowSize; } @Override @@ -115,7 +116,7 @@ public class BucketedSlices { if (o instanceof Slice) { Slice slice = (Slice) o; - return Objects.equals(this.sliceWindowSizeInSecond, slice.sliceWindowSizeInSecond) + return Objects.equals(this.sliceWindowSize, slice.sliceWindowSize) && Objects.equals(this.startSliceInstant, slice.startSliceInstant); } return false; @@ -123,7 +124,7 @@ public class BucketedSlices { @Override public final int hashCode() { - return Objects.hash(startSliceInstant, sliceWindowSizeInSecond); + return Objects.hash(startSliceInstant, sliceWindowSize); } } } http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMail.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMail.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMail.java index 6e022b3..93d8e8b 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMail.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMail.java @@ -20,7 +20,6 @@ package org.apache.james.queue.rabbitmq.view.cassandra.model; import java.time.Instant; -import java.util.Comparator; import java.util.Objects; import org.apache.james.queue.rabbitmq.MailQueueName; @@ -90,10 +89,6 @@ public class EnqueuedMail { new Builder.LastStage(mail, bucketId, timeRangeStart, enqueuedTime, mailKey, mailQueueName); } - public static Comparator<EnqueuedMail> getEnqueuedTimeComparator() { - return Comparator.comparing(EnqueuedMail::getEnqueuedTime); - } - private final Mail mail; private final BucketedSlices.BucketId bucketId; private final Instant timeRangeStart; @@ -143,6 +138,7 @@ public class EnqueuedMail { return Objects.equals(this.bucketId, that.bucketId) && Objects.equals(this.mail, that.mail) && Objects.equals(this.timeRangeStart, that.timeRangeStart) + && Objects.equals(this.enqueuedTime, that.enqueuedTime) && Objects.equals(this.mailKey, that.mailKey) && Objects.equals(this.mailQueueName, that.mailQueueName); } @@ -151,6 +147,6 @@ public class EnqueuedMail { @Override public final int hashCode() { - return Objects.hash(mail, bucketId, timeRangeStart, mailKey, mailQueueName); + return Objects.hash(mail, bucketId, timeRangeStart, enqueuedTime, mailKey, mailQueueName); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/MailKey.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/MailKey.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/MailKey.java index c39dc16..4f17a71 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/MailKey.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/MailKey.java @@ -19,6 +19,8 @@ package org.apache.james.queue.rabbitmq.view.cassandra.model; +import java.util.Objects; + import org.apache.mailet.Mail; import com.google.common.base.Preconditions; @@ -45,4 +47,19 @@ public class MailKey { public String getMailKey() { return mailKey; } + + @Override + public final boolean equals(Object o) { + if (o instanceof MailKey) { + MailKey mailKey1 = (MailKey) o; + + return Objects.equals(this.mailKey, mailKey1.mailKey); + } + return false; + } + + @Override + public final int hashCode() { + return Objects.hash(mailKey); + } } http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index f883883..92b384a 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -19,11 +19,23 @@ package org.apache.james.queue.rabbitmq; +import static java.time.temporal.ChronoUnit.HOURS; +import static org.apache.james.queue.api.Mails.defaultMail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeoutException; +import java.util.stream.IntStream; +import java.util.stream.Stream; import javax.mail.internet.MimeMessage; @@ -34,7 +46,8 @@ import org.apache.james.backend.rabbitmq.RabbitMQConfiguration; import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory; import org.apache.james.backend.rabbitmq.ReusableDockerRabbitMQExtension; import org.apache.james.backends.cassandra.CassandraCluster; -import org.apache.james.backends.cassandra.DockerCassandraExtension; +import org.apache.james.backends.cassandra.CassandraClusterExtension; +import org.apache.james.backends.cassandra.components.CassandraModule; import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration; import org.apache.james.blob.api.HashBlobId; import org.apache.james.blob.api.Store; @@ -43,33 +56,51 @@ import org.apache.james.blob.cassandra.CassandraBlobsDAO; import org.apache.james.blob.mail.MimeMessagePartsId; import org.apache.james.blob.mail.MimeMessageStore; import org.apache.james.queue.api.MailQueue; -import org.apache.james.queue.api.MailQueueContract; import org.apache.james.queue.api.MailQueueMetricContract; import org.apache.james.queue.api.MailQueueMetricExtension; +import org.apache.james.queue.api.ManageableMailQueue; +import org.apache.james.queue.api.ManageableMailQueueContract; +import org.apache.james.queue.rabbitmq.view.api.MailQueueView; +import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewConfiguration; +import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule; +import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewTestFactory; +import org.apache.james.util.streams.Iterators; +import org.apache.mailet.Mail; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import com.github.fge.lambdas.Throwing; import com.nurkiewicz.asyncretry.AsyncRetryExecutor; -@ExtendWith({ReusableDockerRabbitMQExtension.class, DockerCassandraExtension.class}) -public class RabbitMQMailQueueTest implements MailQueueContract, MailQueueMetricContract { +@ExtendWith(ReusableDockerRabbitMQExtension.class) +public class RabbitMQMailQueueTest implements ManageableMailQueueContract, MailQueueMetricContract { private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory(); + private static final int THREE_BUCKET_COUNT = 3; + private static final int UPDATE_BROWSE_START_PACE = 2; + private static final Duration ONE_HOUR_SLICE_WINDOW = Duration.ofHours(1); + private static final String SPOOL = "spool"; + private static final Instant IN_SLICE_1 = Instant.parse("2007-12-03T10:15:30.00Z"); + private static final Instant IN_SLICE_2 = IN_SLICE_1.plus(1, HOURS); + private static final Instant IN_SLICE_3 = IN_SLICE_1.plus(2, HOURS); + private static final Instant IN_SLICE_5 = IN_SLICE_1.plus(4, HOURS); + private static final Instant IN_SLICE_6 = IN_SLICE_1.plus(6, HOURS); - private static CassandraCluster cassandra; - private RabbitMQMailQueueFactory mailQueueFactory; + @RegisterExtension + static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraModule.aggregateModules( + CassandraBlobModule.MODULE, + CassandraMailQueueViewModule.MODULE)); - @BeforeAll - static void setUpClass(DockerCassandraExtension.DockerCassandra dockerCassandra) { - cassandra = CassandraCluster.create(CassandraBlobModule.MODULE, dockerCassandra.getHost()); - } + private RabbitMQMailQueueFactory mailQueueFactory; + private Clock clock; @BeforeEach - void setup(DockerRabbitMQ rabbitMQ, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) throws IOException, TimeoutException, URISyntaxException { + void setup(DockerRabbitMQ rabbitMQ, CassandraCluster cassandra, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) throws IOException, TimeoutException, URISyntaxException { CassandraBlobsDAO blobsDAO = new CassandraBlobsDAO(cassandra.getConf(), CassandraConfiguration.DEFAULT_CONFIGURATION, BLOB_ID_FACTORY); Store<MimeMessage, MimeMessagePartsId> mimeMessageStore = MimeMessageStore.factory(blobsDAO).mimeMessageStore(); @@ -83,44 +114,217 @@ public class RabbitMQMailQueueTest implements MailQueueContract, MailQueueMetric .setHost(rabbitMQ.getHostIp()) .setPort(rabbitMQ.getAdminPort()) .build(); + clock = mock(Clock.class); + when(clock.instant()).thenReturn(Instant.parse("2007-12-03T10:15:30.00Z")); + ThreadLocalRandom random = ThreadLocalRandom.current(); + + MailQueueView mailQueueView = CassandraMailQueueViewTestFactory.factory(clock, random, cassandra.getConf(), cassandra.getTypesProvider(), + CassandraMailQueueViewConfiguration.builder() + .bucketCount(THREE_BUCKET_COUNT) + .updateBrowseStartPace(UPDATE_BROWSE_START_PACE) + .sliceWindow(ONE_HOUR_SLICE_WINDOW) + .build()) + .create(MailQueueName.fromString(SPOOL)); RabbitMQConfiguration rabbitMQConfiguration = RabbitMQConfiguration.builder() .amqpUri(amqpUri) .managementUri(rabbitManagementUri) .build(); - RabbitMQConnectionFactory rabbitMQConnectionFactory = new RabbitMQConnectionFactory( - rabbitMQConfiguration, - new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor())); + RabbitMQConnectionFactory rabbitMQConnectionFactory = new RabbitMQConnectionFactory(rabbitMQConfiguration, + new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor())); RabbitClient rabbitClient = new RabbitClient(new RabbitChannelPool(rabbitMQConnectionFactory)); RabbitMQMailQueue.Factory factory = new RabbitMQMailQueue.Factory( - metricTestSystem.getSpyMetricFactory(), - rabbitClient, - mimeMessageStore, - BLOB_ID_FACTORY); + metricTestSystem.getSpyMetricFactory(), + rabbitClient, + mimeMessageStore, + BLOB_ID_FACTORY, + mailQueueView); + RabbitMQManagementApi mqManagementApi = new RabbitMQManagementApi(rabbitManagementUri, new RabbitMQManagementCredentials("guest", "guest".toCharArray())); mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory); } @AfterEach - void tearDown() { + void tearDown(CassandraCluster cassandra) { cassandra.clearTables(); } @AfterAll - static void tearDownClass() { + static void tearDownClass(CassandraCluster cassandra) { cassandra.closeCluster(); } @Override public MailQueue getMailQueue() { - return mailQueueFactory.createQueue("spool"); + return mailQueueFactory.createQueue(SPOOL); + } + + @Override + public ManageableMailQueue getManageableMailQueue() { + return mailQueueFactory.createQueue(SPOOL); + } + + @Test + void browseShouldReturnCurrentlyEnqueuedMailFromAllSlices() throws Exception { + ManageableMailQueue mailQueue = getManageableMailQueue(); + int emailCount = 5; + + when(clock.instant()).thenReturn(IN_SLICE_1); + enqueueMailsInSlice(1, emailCount); + + when(clock.instant()).thenReturn(IN_SLICE_2); + enqueueMailsInSlice(2, emailCount); + + when(clock.instant()).thenReturn(IN_SLICE_3); + enqueueMailsInSlice(3, emailCount); + + when(clock.instant()).thenReturn(IN_SLICE_5); + enqueueMailsInSlice(5, emailCount); + + when(clock.instant()).thenReturn(IN_SLICE_6); + Stream<String> names = Iterators.toStream(mailQueue.browse()) + .map(ManageableMailQueue.MailQueueItemView::getMail) + .map(Mail::getName); + + assertThat(names).containsExactly( + "1-1", "1-2", "1-3", "1-4", "1-5", + "2-1", "2-2", "2-3", "2-4", "2-5", + "3-1", "3-2", "3-3", "3-4", "3-5", + "5-1", "5-2", "5-3", "5-4", "5-5"); + } + + @Test + void browseAndDequeueShouldCombineWellWhenDifferentSlices() throws Exception { + ManageableMailQueue mailQueue = getManageableMailQueue(); + int emailCount = 5; + + when(clock.instant()).thenReturn(IN_SLICE_1); + enqueueMailsInSlice(1, emailCount); + + when(clock.instant()).thenReturn(IN_SLICE_2); + enqueueMailsInSlice(2, emailCount); + + when(clock.instant()).thenReturn(IN_SLICE_3); + enqueueMailsInSlice(3, emailCount); + + when(clock.instant()).thenReturn(IN_SLICE_5); + enqueueMailsInSlice(5, emailCount); + + when(clock.instant()).thenReturn(IN_SLICE_6); + + dequeueMails(5); + + dequeueMails(3); + MailQueue.MailQueueItem item2_4 = mailQueue.deQueue(); + item2_4.done(false); + dequeueMails(1); + + dequeueMails(5); + + Stream<String> names = Iterators.toStream(mailQueue.browse()) + .map(ManageableMailQueue.MailQueueItemView::getMail) + .map(Mail::getName); + + assertThat(names) + .containsExactly("2-4", "5-1", "5-2", "5-3", "5-4", "5-5"); + } + + @Disabled + @Override + public void clearShouldNotFailWhenBrowsingIterating() { + + } + + @Disabled + @Override + public void browseShouldNotFailWhenConcurrentClearWhenIterating() { + + } + + @Disabled + @Override + public void removeShouldNotFailWhenBrowsingIterating() { + + } + + @Disabled + @Override + public void browseShouldNotFailWhenConcurrentRemoveWhenIterating() { + + } + + @Disabled + @Override + public void removeByNameShouldRemoveSpecificEmail() { + + } + + @Disabled + @Override + public void removeBySenderShouldRemoveSpecificEmail() { + + } + + @Disabled + @Override + public void removeByRecipientShouldRemoveSpecificEmail() { + + } + + @Disabled + @Override + public void removeByRecipientShouldRemoveSpecificEmailWhenMultipleRecipients() { + + } + + @Disabled + @Override + public void removeByNameShouldNotFailWhenQueueIsEmpty() { + + } + + @Disabled + @Override + public void removeBySenderShouldNotFailWhenQueueIsEmpty() { + + } + + @Disabled + @Override + public void removeByRecipientShouldNotFailWhenQueueIsEmpty() { + + } + + @Disabled + @Override + public void clearShouldNotFailWhenQueueIsEmpty() { + + } + + @Disabled + @Override + public void clearShouldRemoveAllElements() { } @Disabled("RabbitMQ Mail Queue do not yet implement getSize()") @Override public void constructorShouldRegisterGetQueueSizeGauge(MailQueueMetricExtension.MailQueueMetricTestSystem testSystem) { + } + + private void enqueueMailsInSlice(int slice, int emailCount) { + ManageableMailQueue mailQueue = getManageableMailQueue(); + + IntStream.rangeClosed(1, emailCount) + .forEach(Throwing.intConsumer(bucketId -> mailQueue.enQueue(defaultMail() + .name(slice + "-" + bucketId) + .build()))); + } + private void dequeueMails(int times) { + ManageableMailQueue mailQueue = getManageableMailQueue(); + IntStream.rangeClosed(1, times) + .forEach(Throwing.intConsumer(bucketId -> mailQueue.deQueue().done(true))); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java index 18cca28..bf7d37e 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java @@ -36,11 +36,12 @@ import org.apache.james.backend.rabbitmq.RabbitMQConfiguration; import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory; import org.apache.james.backend.rabbitmq.ReusableDockerRabbitMQExtension; import org.apache.james.blob.api.HashBlobId; +import org.apache.james.metrics.api.NoopMetricFactory; import org.apache.james.blob.api.Store; import org.apache.james.blob.mail.MimeMessagePartsId; -import org.apache.james.metrics.api.NoopMetricFactory; import org.apache.james.queue.api.MailQueueFactory; import org.apache.james.queue.api.MailQueueFactoryContract; +import org.apache.james.queue.rabbitmq.view.api.MailQueueView; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.ExtendWith; @@ -55,6 +56,7 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM @BeforeEach void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException, URISyntaxException { Store<MimeMessage, MimeMessagePartsId> mimeMessageStore = mock(Store.class); + MailQueueView mailQueueView = mock(MailQueueView.class); URI amqpUri = new URIBuilder() .setScheme("amqp") @@ -78,8 +80,7 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor())); RabbitClient rabbitClient = new RabbitClient(new RabbitChannelPool(rabbitMQConnectionFactory)); - RabbitMQMailQueue.Factory factory = new RabbitMQMailQueue.Factory(new NoopMetricFactory(), rabbitClient, mimeMessageStore, BLOB_ID_FACTORY); - + RabbitMQMailQueue.Factory factory = new RabbitMQMailQueue.Factory(new NoopMetricFactory(), rabbitClient, mimeMessageStore, BLOB_ID_FACTORY, mailQueueView); RabbitMQManagementApi mqManagementApi = new RabbitMQManagementApi(rabbitManagementUri, new RabbitMQManagementCredentials("guest", "guest".toCharArray())); mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory); } @@ -88,4 +89,4 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM public MailQueueFactory<RabbitMQMailQueue> getMailQueueFactory() { return mailQueueFactory; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java new file mode 100644 index 0000000..ef844c6 --- /dev/null +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java @@ -0,0 +1,86 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.queue.rabbitmq.view.cassandra; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Instant; +import java.util.Optional; + +import org.apache.james.backends.cassandra.CassandraCluster; +import org.apache.james.backends.cassandra.CassandraClusterExtension; +import org.apache.james.queue.rabbitmq.MailQueueName; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class BrowseStartDAOTest { + + private static final MailQueueName OUT_GOING_1 = MailQueueName.fromString("OUT_GOING_1"); + private static final MailQueueName OUT_GOING_2 = MailQueueName.fromString("OUT_GOING_2"); + private static final Instant NOW = Instant.now(); + private static final Instant NOW_PLUS_TEN_SECONDS = NOW.plusSeconds(10); + + @RegisterExtension + static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraMailQueueViewModule.MODULE); + + private BrowseStartDAO testee; + + @BeforeEach + void setUp(CassandraCluster cassandra) { + testee = new BrowseStartDAO(cassandra.getConf()); + } + + @Test + void findBrowseStartShouldReturnEmptyWhenTableDoesntContainQueueName() { + testee.updateBrowseStart(OUT_GOING_1, NOW).join(); + + Optional<Instant> firstEnqueuedItemFromQueue2 = testee.findBrowseStart(OUT_GOING_2).join(); + assertThat(firstEnqueuedItemFromQueue2) + .isEmpty(); + } + + @Test + void findBrowseStartShouldReturnInstantWhenTableContainsQueueName() { + testee.updateBrowseStart(OUT_GOING_1, NOW).join(); + testee.updateBrowseStart(OUT_GOING_2, NOW).join(); + + Optional<Instant> firstEnqueuedItemFromQueue2 = testee.findBrowseStart(OUT_GOING_2).join(); + assertThat(firstEnqueuedItemFromQueue2) + .isNotEmpty(); + } + + @Test + void updateFirstEnqueuedTimeShouldWork() { + testee.updateBrowseStart(OUT_GOING_1, NOW).join(); + + assertThat(testee.selectOne(OUT_GOING_1).join()) + .isNotEmpty(); + } + + @Test + void insertInitialBrowseStartShouldInsertFirstInstant() { + testee.insertInitialBrowseStart(OUT_GOING_1, NOW).join(); + testee.insertInitialBrowseStart(OUT_GOING_1, NOW_PLUS_TEN_SECONDS).join(); + + assertThat(testee.findBrowseStart(OUT_GOING_1).join()) + .contains(NOW); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java new file mode 100644 index 0000000..95733c2 --- /dev/null +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java @@ -0,0 +1,48 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.queue.rabbitmq.view.cassandra; + +import java.time.Clock; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.james.backends.cassandra.init.CassandraTypesProvider; +import org.apache.james.backends.cassandra.utils.CassandraUtils; + +import com.datastax.driver.core.Session; + +public class CassandraMailQueueViewTestFactory { + + public static CassandraMailQueueView.Factory factory(Clock clock, ThreadLocalRandom random, Session session, + CassandraTypesProvider typesProvider, + CassandraMailQueueViewConfiguration configuration) { + EnqueuedMailsDAO enqueuedMailsDao = new EnqueuedMailsDAO(session, CassandraUtils.WITH_DEFAULT_CONFIGURATION, typesProvider); + BrowseStartDAO browseStartDao = new BrowseStartDAO(session); + DeletedMailsDAO deletedMailsDao = new DeletedMailsDAO(session); + + CassandraMailQueueBrowser cassandraMailQueueBrowser = new CassandraMailQueueBrowser(browseStartDao, deletedMailsDao, enqueuedMailsDao, configuration, clock); + CassandraMailQueueMailStore cassandraMailQueueMailStore = new CassandraMailQueueMailStore(enqueuedMailsDao, browseStartDao, configuration, clock); + CassandraMailQueueMailDelete cassandraMailQueueMailDelete = new CassandraMailQueueMailDelete(deletedMailsDao, browseStartDao, cassandraMailQueueBrowser, configuration, random); + + return new CassandraMailQueueView.Factory( + cassandraMailQueueMailStore, + cassandraMailQueueBrowser, + cassandraMailQueueMailDelete); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java new file mode 100644 index 0000000..d9dc69a --- /dev/null +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/DeletedMailsDAOTest.java @@ -0,0 +1,108 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.queue.rabbitmq.view.cassandra; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.james.backends.cassandra.CassandraCluster; +import org.apache.james.backends.cassandra.CassandraClusterExtension; +import org.apache.james.queue.rabbitmq.MailQueueName; +import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class DeletedMailsDAOTest { + + private static final MailQueueName OUT_GOING_1 = MailQueueName.fromString("OUT_GOING_1"); + private static final MailQueueName OUT_GOING_2 = MailQueueName.fromString("OUT_GOING_2"); + private static final MailKey MAIL_KEY_1 = MailKey.of("mailkey1"); + private static final MailKey MAIL_KEY_2 = MailKey.of("mailkey2"); + + @RegisterExtension + static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraMailQueueViewModule.MODULE); + + private DeletedMailsDAO testee; + + @BeforeEach + void setUp(CassandraCluster cassandra) { + testee = new DeletedMailsDAO(cassandra.getConf()); + } + + @Test + void markAsDeletedShouldWork() { + Boolean isDeletedBeforeMark = testee + .isDeleted(OUT_GOING_1, MAIL_KEY_1) + .join(); + assertThat(isDeletedBeforeMark).isFalse(); + + testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_1).join(); + + Boolean isDeletedAfterMark = testee + .isDeleted(OUT_GOING_1, MAIL_KEY_1) + .join(); + + assertThat(isDeletedAfterMark).isTrue(); + } + + @Test + void checkDeletedShouldReturnFalseWhenTableDoesntContainBothMailQueueAndMailKey() { + testee.markAsDeleted(OUT_GOING_2, MAIL_KEY_2).join(); + + Boolean isDeleted = testee + .isDeleted(OUT_GOING_1, MAIL_KEY_1) + .join(); + + assertThat(isDeleted).isFalse(); + } + + @Test + void checkDeletedShouldReturnFalseWhenTableContainsMailQueueButNotMailKey() { + testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_2).join(); + + Boolean isDeleted = testee + .isDeleted(OUT_GOING_1, MAIL_KEY_1) + .join(); + + assertThat(isDeleted).isFalse(); + } + + @Test + void checkDeletedShouldReturnFalseWhenTableContainsMailKeyButNotMailQueue() { + testee.markAsDeleted(OUT_GOING_2, MAIL_KEY_1).join(); + + Boolean isDeleted = testee + .isDeleted(OUT_GOING_1, MAIL_KEY_1) + .join(); + + assertThat(isDeleted).isFalse(); + } + + @Test + void checkDeletedShouldReturnTrueWhenTableContainsMailItem() { + testee.markAsDeleted(OUT_GOING_1, MAIL_KEY_1).join(); + + Boolean isDeleted = testee + .isDeleted(OUT_GOING_1, MAIL_KEY_1) + .join(); + + assertThat(isDeleted).isTrue(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java new file mode 100644 index 0000000..94a80a3 --- /dev/null +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java @@ -0,0 +1,126 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.queue.rabbitmq.view.cassandra; + +import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.BucketId; +import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice; +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; +import java.time.Instant; +import java.util.stream.Stream; + +import org.apache.james.backends.cassandra.CassandraCluster; +import org.apache.james.backends.cassandra.CassandraClusterExtension; +import org.apache.james.backends.cassandra.utils.CassandraUtils; +import org.apache.james.queue.rabbitmq.MailQueueName; +import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedMail; +import org.apache.james.queue.rabbitmq.view.cassandra.model.MailKey; +import org.apache.mailet.base.test.FakeMail; +import org.assertj.core.api.SoftAssertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class EnqueuedMailsDaoTest { + + private static final MailQueueName OUT_GOING_1 = MailQueueName.fromString("OUT_GOING_1"); + private static final MailKey MAIL_KEY_1 = MailKey.of("mailkey1"); + private static int BUCKET_ID_VALUE = 10; + private static final BucketId BUCKET_ID = BucketId.of(BUCKET_ID_VALUE); + private static final Instant NOW = Instant.now(); + private static final Slice SLICE_OF_NOW = Slice.of(NOW, Duration.ofSeconds(100)); + + @RegisterExtension + static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraMailQueueViewModule.MODULE); + + private EnqueuedMailsDAO testee; + + @BeforeEach + void setUp(CassandraCluster cassandra) { + testee = new EnqueuedMailsDAO( + cassandra.getConf(), + CassandraUtils.WITH_DEFAULT_CONFIGURATION, + cassandra.getTypesProvider()); + } + + @Test + void insertShouldWork() throws Exception { + testee.insert(EnqueuedMail.builder() + .mail(FakeMail.builder() + .name(MAIL_KEY_1.getMailKey()) + .build()) + .bucketId(BucketId.of(BUCKET_ID_VALUE)) + .timeRangeStart(NOW) + .enqueuedTime(NOW) + .mailKey(MAIL_KEY_1) + .mailQueueName(OUT_GOING_1) + .build()) + .join(); + + Stream<EnqueuedMail> selectedEnqueuedMails = testee + .selectEnqueuedMails(OUT_GOING_1, SLICE_OF_NOW, BUCKET_ID) + .join(); + + assertThat(selectedEnqueuedMails).hasSize(1); + } + + @Test + void selectEnqueuedMailsShouldWork() throws Exception { + testee.insert(EnqueuedMail.builder() + .mail(FakeMail.builder() + .name(MAIL_KEY_1.getMailKey()) + .build()) + .bucketId(BucketId.of(BUCKET_ID_VALUE)) + .timeRangeStart(NOW) + .enqueuedTime(NOW) + .mailKey(MAIL_KEY_1) + .mailQueueName(OUT_GOING_1) + .build()) + .join(); + + testee.insert(EnqueuedMail.builder() + .mail(FakeMail.builder() + .name(MAIL_KEY_1.getMailKey()) + .build()) + .bucketId(BucketId.of(BUCKET_ID_VALUE + 1)) + .timeRangeStart(NOW) + .enqueuedTime(NOW) + .mailKey(MAIL_KEY_1) + .mailQueueName(OUT_GOING_1) + .build()) + .join(); + + Stream<EnqueuedMail> selectedEnqueuedMails = testee.selectEnqueuedMails(OUT_GOING_1, SLICE_OF_NOW, BUCKET_ID) + .join(); + + assertThat(selectedEnqueuedMails) + .hasSize(1) + .hasOnlyOneElementSatisfying(selectedEnqueuedMail -> { + SoftAssertions softly = new SoftAssertions(); + softly.assertThat(selectedEnqueuedMail.getMailQueueName()).isEqualTo(OUT_GOING_1); + softly.assertThat(selectedEnqueuedMail.getBucketId()).isEqualTo(BUCKET_ID); + softly.assertThat(selectedEnqueuedMail.getTimeRangeStart()).isEqualTo(NOW); + softly.assertThat(selectedEnqueuedMail.getEnqueuedTime()).isEqualTo(NOW); + softly.assertThat(selectedEnqueuedMail.getMailKey()).isEqualTo(MAIL_KEY_1); + softly.assertAll(); + }); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java index 110d870..5386050 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java @@ -21,13 +21,17 @@ package org.apache.james.queue.rabbitmq.view.cassandra.model; import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import java.time.Duration; import java.time.Instant; import java.util.stream.Stream; -import org.junit.jupiter.api.Nested; +import org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.BucketId; import org.junit.jupiter.api.Test; +import nl.jqno.equalsverifier.EqualsVerifier; + class BucketedSlicesTest { private static final long ONE_HOUR_IN_SECONDS = 3600; @@ -36,28 +40,64 @@ class BucketedSlicesTest { private static final Instant FIRST_SLICE_INSTANT_NEXT_HOUR = FIRST_SLICE_INSTANT.plusSeconds(ONE_HOUR_IN_SECONDS); private static final Instant FIRST_SLICE_INSTANT_NEXT_TWO_HOUR = FIRST_SLICE_INSTANT.plusSeconds(ONE_HOUR_IN_SECONDS * 2); - private static final Slice FIRST_SLICE = Slice.of(FIRST_SLICE_INSTANT, ONE_HOUR_IN_SECONDS); - private static final Slice FIRST_SLICE_NEXT_TWO_HOUR = Slice.of(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR, ONE_HOUR_IN_SECONDS); + private static final Duration ONE_HOUR_SLICE_WINDOW = Duration.ofSeconds(ONE_HOUR_IN_SECONDS); + private static final Slice FIRST_SLICE = Slice.of(FIRST_SLICE_INSTANT, ONE_HOUR_SLICE_WINDOW); + private static final Slice FIRST_SLICE_NEXT_TWO_HOUR = Slice.of(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR, Duration.ofSeconds(ONE_HOUR_IN_SECONDS)); + + @Test + void bucketIdShouldMatchBeanContract() { + EqualsVerifier.forClass(BucketId.class) + .verify(); + } + + @Test + void sliceShouldMatchBeanContract() { + EqualsVerifier.forClass(Slice.class) + .verify(); + } - @Nested - class Validation { + @Test + void bucketIdShouldThrowWhenValueIsNegative() { + assertThatThrownBy(() -> BucketId.of(-1)) + .isInstanceOf(IllegalArgumentException.class); } @Test void allSlicesTillShouldReturnOnlyFirstSliceWhenEndAtInTheSameInterval() { - assertThat(Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT.plusSeconds(3599))) + assertThat(Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT.plusSeconds(ONE_HOUR_IN_SECONDS - 1))) .containsOnly(FIRST_SLICE); } @Test void allSlicesTillShouldReturnAllSlicesBetweenStartAndEndAt() { - Stream<Slice> allSlices = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(3599)); + Stream<Slice> allSlices = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(ONE_HOUR_IN_SECONDS - 1)); assertThat(allSlices) .containsExactly( FIRST_SLICE, - Slice.of(FIRST_SLICE_INSTANT_NEXT_HOUR, ONE_HOUR_IN_SECONDS), - Slice.of(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR, ONE_HOUR_IN_SECONDS)); + Slice.of(FIRST_SLICE_INSTANT_NEXT_HOUR, ONE_HOUR_SLICE_WINDOW), + Slice.of(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR, ONE_HOUR_SLICE_WINDOW)); + } + + @Test + void allSlicesTillShouldReturnSameSlicesWhenEndAtsAreInTheSameInterval() { + Stream<Slice> allSlicesEndAtTheStartOfWindow = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR); + Stream<Slice> allSlicesEndAtTheMiddleOfWindow = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(1000)); + Stream<Slice> allSlicesEndAtTheEndWindow = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(ONE_HOUR_IN_SECONDS - 1)); + + Slice [] allSlicesInThreeHours = { + FIRST_SLICE, + Slice.of(FIRST_SLICE_INSTANT_NEXT_HOUR, ONE_HOUR_SLICE_WINDOW), + Slice.of(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR, ONE_HOUR_SLICE_WINDOW)}; + + assertThat(allSlicesEndAtTheStartOfWindow) + .containsExactly(allSlicesInThreeHours); + + assertThat(allSlicesEndAtTheMiddleOfWindow) + .containsExactly(allSlicesInThreeHours); + + assertThat(allSlicesEndAtTheEndWindow) + .containsExactly(allSlicesInThreeHours); } @Test http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMailTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMailTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMailTest.java new file mode 100644 index 0000000..fb0f487 --- /dev/null +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMailTest.java @@ -0,0 +1,33 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.queue.rabbitmq.view.cassandra.model; + +import org.junit.jupiter.api.Test; + +import nl.jqno.equalsverifier.EqualsVerifier; + +class EnqueuedMailTest { + + @Test + void shouldMatchBeanContract() { + EqualsVerifier.forClass(EnqueuedMail.class) + .verify(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/3a53806a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/MailKeyTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/MailKeyTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/MailKeyTest.java new file mode 100644 index 0000000..c1a854e --- /dev/null +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/MailKeyTest.java @@ -0,0 +1,33 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.queue.rabbitmq.view.cassandra.model; + +import org.junit.jupiter.api.Test; + +import nl.jqno.equalsverifier.EqualsVerifier; + +class MailKeyTest { + + @Test + void shouldMatchBeanContract() { + EqualsVerifier.forClass(MailKey.class) + .verify(); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org