Repository: james-project
Updated Branches:
  refs/heads/master 9e66ddb13 -> b70ca1a7c


JAMES-2544 RabbitMQ browse `MailQueueView` API and cassandra table definitions


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/0523acf4
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/0523acf4
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/0523acf4

Branch: refs/heads/master
Commit: 0523acf435ff5985cc39a1e5b0454d0bd45a8285
Parents: 9e66ddb
Author: duc <dt...@linagora.com>
Authored: Wed Sep 12 20:12:47 2018 +0700
Committer: Benoit Tellier <btell...@linagora.com>
Committed: Wed Sep 26 09:21:47 2018 +0700

----------------------------------------------------------------------
 server/queue/queue-rabbitmq/pom.xml             |   1 -
 .../james/queue/rabbitmq/MailQueueName.java     |   6 +-
 .../queue/rabbitmq/view/api/MailQueueView.java  |  36 +++++
 .../CassandraMailQueueViewConfiguration.java    |  91 +++++++++++
 .../cassandra/CassandraMailQueueViewModule.java | 121 ++++++++++++++
 .../view/cassandra/model/BucketedSlices.java    | 129 +++++++++++++++
 .../view/cassandra/model/EnqueuedMail.java      | 156 +++++++++++++++++++
 .../rabbitmq/view/cassandra/model/MailKey.java  |  48 ++++++
 .../cassandra/model/BucketedSlicesTest.java     |  69 ++++++++
 9 files changed, 653 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/0523acf4/server/queue/queue-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/server/queue/queue-rabbitmq/pom.xml 
b/server/queue/queue-rabbitmq/pom.xml
index 2bc2a58..47b958b 100644
--- a/server/queue/queue-rabbitmq/pom.xml
+++ b/server/queue/queue-rabbitmq/pom.xml
@@ -50,7 +50,6 @@
         <dependency>
             <groupId>${james.groupId}</groupId>
             <artifactId>apache-james-backends-cassandra</artifactId>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>

http://git-wip-us.apache.org/repos/asf/james-project/blob/0523acf4/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailQueueName.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailQueueName.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailQueueName.java
index e059e3f..5c060de 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailQueueName.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailQueueName.java
@@ -26,7 +26,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 
-final class MailQueueName {
+public final class MailQueueName {
 
     static class WorkQueueName {
         static Optional<WorkQueueName> fromString(String name) {
@@ -114,7 +114,7 @@ final class MailQueueName {
     private static final String EXCHANGE_PREFIX = PREFIX + "-exchange-";
     @VisibleForTesting static final String WORKQUEUE_PREFIX = PREFIX + 
"-workqueue-";
 
-    static MailQueueName fromString(String name) {
+    public static MailQueueName fromString(String name) {
         Preconditions.checkNotNull(name);
         return new MailQueueName(name);
     }
@@ -130,7 +130,7 @@ final class MailQueueName {
         this.name = name;
     }
 
-    String asString() {
+    public String asString() {
         return name;
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/0523acf4/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
new file mode 100644
index 0000000..0d70239
--- /dev/null
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
@@ -0,0 +1,36 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq.view.api;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.james.queue.api.ManageableMailQueue;
+import org.apache.mailet.Mail;
+
+public interface MailQueueView {
+
+    CompletableFuture<Void> storeMail(Mail mail);
+
+    CompletableFuture<Void> deleteMail(Mail mail);
+
+    ManageableMailQueue.MailQueueIterator browse();
+
+    long getSize();
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/0523acf4/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewConfiguration.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewConfiguration.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewConfiguration.java
new file mode 100644
index 0000000..b138887
--- /dev/null
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewConfiguration.java
@@ -0,0 +1,91 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq.view.cassandra;
+
+import java.time.Duration;
+
+import com.google.common.base.Preconditions;
+
+public class CassandraMailQueueViewConfiguration {
+    interface Builder {
+        @FunctionalInterface
+        interface RequireBucketCount {
+            RequireUpdateBrowseStartPace bucketCount(int bucketCount);
+        }
+
+        @FunctionalInterface
+        interface RequireUpdateBrowseStartPace {
+            RequireSliceWindow updateBrowseStartPace(int 
updateBrowseStartPace);
+        }
+
+        @FunctionalInterface
+        interface RequireSliceWindow {
+            ReadyToBuild sliceWindow(Duration sliceWindow);
+        }
+
+        class ReadyToBuild {
+            private final int bucketCount;
+            private final int updateBrowseStartPace;
+            private final Duration sliceWindow;
+
+            private ReadyToBuild(int bucketCount, int updateBrowseStartPace, 
Duration sliceWindow) {
+                this.bucketCount = bucketCount;
+                this.updateBrowseStartPace = updateBrowseStartPace;
+                this.sliceWindow = sliceWindow;
+            }
+
+            public CassandraMailQueueViewConfiguration build() {
+                Preconditions.checkNotNull(sliceWindow, "'sliceWindow' is 
compulsory");
+                Preconditions.checkState(bucketCount > 0, "'bucketCount' needs 
to be a strictly positive integer");
+                Preconditions.checkState(updateBrowseStartPace > 0, 
"'updateBrowseStartPace' needs to be a strictly positive integer");
+
+                return new CassandraMailQueueViewConfiguration(bucketCount, 
updateBrowseStartPace, sliceWindow);
+            }
+        }
+    }
+
+    public static Builder.RequireBucketCount builder() {
+        return bucketCount -> updateBrowseStartPace -> sliceWindow -> new 
Builder.ReadyToBuild(bucketCount, updateBrowseStartPace, sliceWindow);
+    }
+
+    private final int bucketCount;
+    private final int updateBrowseStartPace;
+    private final Duration sliceWindow;
+
+    private CassandraMailQueueViewConfiguration(int bucketCount,
+                                                int updateBrowseStartPace,
+                                                Duration sliceWindow) {
+        this.bucketCount = bucketCount;
+        this.updateBrowseStartPace = updateBrowseStartPace;
+        this.sliceWindow = sliceWindow;
+    }
+
+    public int getUpdateBrowseStartPace() {
+        return updateBrowseStartPace;
+    }
+
+    public int getBucketCount() {
+        return bucketCount;
+    }
+
+    public Duration getSliceWindow() {
+        return sliceWindow;
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/0523acf4/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewModule.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewModule.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewModule.java
new file mode 100644
index 0000000..6460eb4
--- /dev/null
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewModule.java
@@ -0,0 +1,121 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq.view.cassandra;
+
+import static com.datastax.driver.core.DataType.blob;
+import static com.datastax.driver.core.DataType.cint;
+import static com.datastax.driver.core.DataType.list;
+import static com.datastax.driver.core.DataType.map;
+import static com.datastax.driver.core.DataType.text;
+import static com.datastax.driver.core.DataType.timestamp;
+import static com.datastax.driver.core.schemabuilder.SchemaBuilder.frozen;
+
+import org.apache.james.backends.cassandra.components.CassandraModule;
+
+public interface CassandraMailQueueViewModule {
+
+    interface EnqueuedMailsTable {
+        String TABLE_NAME = "enqueuedMails";
+
+        String QUEUE_NAME = "queueName";
+        String TIME_RANGE_START = "timeRangeStart";
+        String BUCKET_ID = "bucketId";
+
+        String ENQUEUED_TIME = "enqueuedTime";
+        String MAIL_KEY = "mailKey";
+        String HEADER_BLOB_ID = "headerBlobId";
+        String BODY_BLOB_ID = "bodyBlobId";
+        String STATE = "state";
+        String SENDER = "sender";
+        String RECIPIENTS = "recipients";
+        String ATTRIBUTES = "attributes";
+        String ERROR_MESSAGE = "errorMessage";
+        String REMOTE_HOST = "remoteHost";
+        String REMOTE_ADDR = "remoteAddr";
+        String LAST_UPDATED = "lastUpdated";
+        String PER_RECIPIENT_SPECIFIC_HEADERS = "perRecipientSpecificHeaders";
+
+        String HEADER_TYPE = "header";
+        String HEADER_NAME = "headerName";
+        String HEADER_VALUE = "headerValue";
+    }
+
+    interface BrowseStartTable {
+        String TABLE_NAME = "browseStart";
+
+        String QUEUE_NAME = "queueName";
+        String BROWSE_START = "browseStart";
+    }
+
+    interface DeletedMailTable {
+        String TABLE_NAME = "deletedMails";
+
+        String QUEUE_NAME = "queueName";
+        String MAIL_KEY = "mailKey";
+    }
+
+    CassandraModule MODULE = CassandraModule
+        .type(EnqueuedMailsTable.HEADER_TYPE)
+            .statement(statement -> statement
+                .addColumn(EnqueuedMailsTable.HEADER_NAME, text())
+                .addColumn(EnqueuedMailsTable.HEADER_VALUE, text()))
+        .table(EnqueuedMailsTable.TABLE_NAME)
+        .comment("store enqueued mails, if a mail is enqueued into a mail 
queue, it also being stored in this table," +
+            " when a mail is dequeued from a mail queue, the record associated 
with that mail still available in this" +
+            " table and will not be deleted immediately regarding to the 
performance impacts," +
+            " but after some scheduled tasks")
+        .options(options -> options)
+        .statement(statement -> statement
+            .addPartitionKey(EnqueuedMailsTable.QUEUE_NAME, text())
+            .addPartitionKey(EnqueuedMailsTable.TIME_RANGE_START, timestamp())
+            .addPartitionKey(EnqueuedMailsTable.BUCKET_ID, cint())
+            .addClusteringColumn(EnqueuedMailsTable.MAIL_KEY, text())
+            .addColumn(EnqueuedMailsTable.ENQUEUED_TIME, timestamp())
+            .addColumn(EnqueuedMailsTable.STATE, text())
+            .addColumn(EnqueuedMailsTable.HEADER_BLOB_ID, text())
+            .addColumn(EnqueuedMailsTable.BODY_BLOB_ID, text())
+            .addColumn(EnqueuedMailsTable.ATTRIBUTES, map(text(), blob()))
+            .addColumn(EnqueuedMailsTable.ERROR_MESSAGE, text())
+            .addColumn(EnqueuedMailsTable.SENDER, text())
+            .addColumn(EnqueuedMailsTable.RECIPIENTS, list(text()))
+            .addColumn(EnqueuedMailsTable.REMOTE_HOST, text())
+            .addColumn(EnqueuedMailsTable.REMOTE_ADDR, text())
+            .addColumn(EnqueuedMailsTable.LAST_UPDATED, timestamp())
+            
.addUDTMapColumn(EnqueuedMailsTable.PER_RECIPIENT_SPECIFIC_HEADERS, text(), 
frozen(EnqueuedMailsTable.HEADER_TYPE)))
+
+        .table(BrowseStartTable.TABLE_NAME)
+        .comment("this table allows to find the starting point of iteration 
from the table: "
+            + EnqueuedMailsTable.TABLE_NAME + " in order to make a browse 
operations through mail queues")
+        .options(options -> options)
+        .statement(statement -> statement
+            .addPartitionKey(BrowseStartTable.QUEUE_NAME, text())
+            .addColumn(BrowseStartTable.BROWSE_START, timestamp()))
+
+        .table(DeletedMailTable.TABLE_NAME)
+        .comment("this table stores the dequeued mails, while browsing mail 
from table: "
+            + EnqueuedMailsTable.TABLE_NAME + " we need to filter out mails 
have been dequeued by checking their " +
+            "existence in this table")
+        .options(options -> options)
+        .statement(statement -> statement
+            .addPartitionKey(DeletedMailTable.QUEUE_NAME, text())
+            .addPartitionKey(DeletedMailTable.MAIL_KEY, text()))
+
+        .build();
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/0523acf4/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java
new file mode 100644
index 0000000..f7d5ac1
--- /dev/null
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java
@@ -0,0 +1,129 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq.view.cassandra.model;
+
+import java.time.Instant;
+import java.util.Objects;
+import java.util.stream.LongStream;
+import java.util.stream.Stream;
+
+import com.google.common.base.Preconditions;
+
+public class BucketedSlices {
+
+    public static class BucketId {
+
+        public static BucketId of(int bucketId) {
+            return new BucketId(bucketId);
+        }
+
+        private final int value;
+
+        private BucketId(int value) {
+            Preconditions.checkArgument(value >= 0, "sliceWindowSizeInSecond 
should not be negative");
+
+            this.value = value;
+        }
+
+        public int getValue() {
+            return value;
+        }
+
+        @Override
+        public final boolean equals(Object o) {
+            if (o instanceof BucketId) {
+                BucketId bucketId = (BucketId) o;
+
+                return Objects.equals(this.value, bucketId.value);
+            }
+            return false;
+        }
+
+        @Override
+        public final int hashCode() {
+            return Objects.hash(value);
+        }
+    }
+
+    public static class Slice {
+
+        public static Slice of(Instant sliceStartInstant, long 
sliceWindowSizeInSecond) {
+            return new Slice(sliceStartInstant, sliceWindowSizeInSecond);
+        }
+
+        public static Stream<Slice> allSlicesTill(Slice firstSlice, Instant 
endAt) {
+            long sliceCount = calculateSliceCount(firstSlice, endAt);
+            long startAtSeconds =  
firstSlice.getStartSliceInstant().getEpochSecond();
+            long sliceWindowSizeInSecond = 
firstSlice.getSliceWindowSizeInSecond();
+
+            return LongStream.range(0, sliceCount)
+                .map(slicePosition -> startAtSeconds + sliceWindowSizeInSecond 
* slicePosition)
+                .mapToObj(Instant::ofEpochSecond)
+                .map(sliceStartInstant -> Slice.of(sliceStartInstant, 
firstSlice.getSliceWindowSizeInSecond()));
+        }
+
+        private static long calculateSliceCount(Slice firstSlice, Instant 
endAt) {
+            long startAtSeconds =  
firstSlice.getStartSliceInstant().getEpochSecond();
+            long endAtSeconds = endAt.getEpochSecond();
+            long timeDiffInSecond = endAtSeconds - startAtSeconds;
+
+            if (timeDiffInSecond < 0) {
+                return 0;
+            } else {
+                return (timeDiffInSecond / firstSlice.sliceWindowSizeInSecond) 
+ 1;
+            }
+        }
+
+        private final Instant startSliceInstant;
+        private final long sliceWindowSizeInSecond;
+
+        private Slice(Instant startSliceInstant, long sliceWindowSizeInSecond) 
{
+            Preconditions.checkNotNull(startSliceInstant);
+            Preconditions.checkArgument(sliceWindowSizeInSecond > 0, 
"sliceWindowSizeInSecond should be positive");
+
+            this.startSliceInstant = startSliceInstant;
+            this.sliceWindowSizeInSecond = sliceWindowSizeInSecond;
+        }
+
+        public Instant getStartSliceInstant() {
+            return startSliceInstant;
+        }
+
+        public long getSliceWindowSizeInSecond() {
+            return sliceWindowSizeInSecond;
+        }
+
+        @Override
+        public final boolean equals(Object o) {
+            if (o instanceof Slice) {
+                Slice slice = (Slice) o;
+
+                return Objects.equals(this.sliceWindowSizeInSecond, 
slice.sliceWindowSizeInSecond)
+                    && Objects.equals(this.startSliceInstant, 
slice.startSliceInstant);
+            }
+            return false;
+        }
+
+        @Override
+        public final int hashCode() {
+            return Objects.hash(startSliceInstant, sliceWindowSizeInSecond);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/0523acf4/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMail.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMail.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMail.java
new file mode 100644
index 0000000..6e022b3
--- /dev/null
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMail.java
@@ -0,0 +1,156 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq.view.cassandra.model;
+
+import java.time.Instant;
+import java.util.Comparator;
+import java.util.Objects;
+
+import org.apache.james.queue.rabbitmq.MailQueueName;
+import org.apache.mailet.Mail;
+
+public class EnqueuedMail {
+
+    public interface Builder {
+
+        @FunctionalInterface
+        interface RequireMail {
+            RequireBucketId mail(Mail mail);
+        }
+
+        @FunctionalInterface
+        interface RequireBucketId {
+            RequireTimeRangeStart bucketId(BucketedSlices.BucketId bucketId);
+        }
+
+        @FunctionalInterface
+        interface RequireTimeRangeStart {
+            RequireEnqueuedTime timeRangeStart(Instant timeRangeStart);
+        }
+
+        @FunctionalInterface
+        interface RequireEnqueuedTime {
+            RequireMailKey enqueuedTime(Instant enqueuedTime);
+        }
+
+        @FunctionalInterface
+        interface RequireMailKey {
+            RequireMailQueueName mailKey(MailKey mailKey);
+        }
+
+        @FunctionalInterface
+        interface RequireMailQueueName {
+            LastStage mailQueueName(MailQueueName mailQueueName);
+        }
+
+        class LastStage {
+            private Mail mail;
+            private BucketedSlices.BucketId bucketId;
+            private Instant timeRangeStart;
+            private Instant enqueuedTime;
+            private MailKey mailKey;
+            private MailQueueName mailQueueName;
+
+            private LastStage(Mail mail, BucketedSlices.BucketId bucketId,
+                              Instant timeRangeStart, Instant enqueuedTime,
+                              MailKey mailKey, MailQueueName mailQueueName) {
+                this.mail = mail;
+                this.bucketId = bucketId;
+                this.timeRangeStart = timeRangeStart;
+                this.enqueuedTime = enqueuedTime;
+                this.mailKey = mailKey;
+                this.mailQueueName = mailQueueName;
+            }
+
+            public EnqueuedMail build() {
+                return new EnqueuedMail(mail, bucketId, timeRangeStart, 
enqueuedTime, mailKey, mailQueueName);
+            }
+        }
+    }
+
+    public static Builder.RequireMail builder() {
+        return mail -> bucketId -> timeRangeStart -> enqueuedTime -> mailKey 
-> mailQueueName ->
+            new Builder.LastStage(mail, bucketId, timeRangeStart, 
enqueuedTime, mailKey, mailQueueName);
+    }
+
+    public static Comparator<EnqueuedMail> getEnqueuedTimeComparator() {
+        return Comparator.comparing(EnqueuedMail::getEnqueuedTime);
+    }
+
+    private final Mail mail;
+    private final BucketedSlices.BucketId bucketId;
+    private final Instant timeRangeStart;
+    private final Instant enqueuedTime;
+    private final MailKey mailKey;
+    private final MailQueueName mailQueueName;
+
+    private EnqueuedMail(Mail mail, BucketedSlices.BucketId bucketId, Instant 
timeRangeStart,
+                         Instant enqueuedTime, MailKey mailKey, MailQueueName 
mailQueueName) {
+        this.mail = mail;
+        this.bucketId = bucketId;
+        this.timeRangeStart = timeRangeStart;
+        this.enqueuedTime = enqueuedTime;
+        this.mailKey = mailKey;
+        this.mailQueueName = mailQueueName;
+    }
+
+    public Mail getMail() {
+        return mail;
+    }
+
+    public BucketedSlices.BucketId getBucketId() {
+        return bucketId;
+    }
+
+    public MailKey getMailKey() {
+        return mailKey;
+    }
+
+    public MailQueueName getMailQueueName() {
+        return mailQueueName;
+    }
+
+    public Instant getTimeRangeStart() {
+        return timeRangeStart;
+    }
+
+    public Instant getEnqueuedTime() {
+        return enqueuedTime;
+    }
+
+    @Override
+    public final boolean equals(Object o) {
+        if (o instanceof EnqueuedMail) {
+            EnqueuedMail that = (EnqueuedMail) o;
+
+            return Objects.equals(this.bucketId, that.bucketId)
+                    && Objects.equals(this.mail, that.mail)
+                    && Objects.equals(this.timeRangeStart, that.timeRangeStart)
+                    && Objects.equals(this.mailKey, that.mailKey)
+                    && Objects.equals(this.mailQueueName, that.mailQueueName);
+        }
+        return false;
+    }
+
+    @Override
+    public final int hashCode() {
+        return Objects.hash(mail, bucketId, timeRangeStart, mailKey, 
mailQueueName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/0523acf4/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/MailKey.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/MailKey.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/MailKey.java
new file mode 100644
index 0000000..c39dc16
--- /dev/null
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/MailKey.java
@@ -0,0 +1,48 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq.view.cassandra.model;
+
+import org.apache.mailet.Mail;
+
+import com.google.common.base.Preconditions;
+
+public class MailKey {
+
+    public static MailKey fromMail(Mail mail) {
+        return of(mail.getName());
+    }
+
+    public static MailKey of(String mailKey) {
+        return new MailKey(mailKey);
+    }
+
+    private final String mailKey;
+
+    private MailKey(String mailKey) {
+        Preconditions.checkNotNull(mailKey);
+        Preconditions.checkArgument(!mailKey.isEmpty());
+
+        this.mailKey = mailKey;
+    }
+
+    public String getMailKey() {
+        return mailKey;
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/0523acf4/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java
new file mode 100644
index 0000000..110d870
--- /dev/null
+++ 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java
@@ -0,0 +1,69 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq.view.cassandra.model;
+
+import static 
org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Instant;
+import java.util.stream.Stream;
+
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+class BucketedSlicesTest {
+    
+    private static final long ONE_HOUR_IN_SECONDS = 3600;
+
+    private static final Instant FIRST_SLICE_INSTANT = 
Instant.parse("2018-05-20T12:00:00.000Z");
+    private static final Instant FIRST_SLICE_INSTANT_NEXT_HOUR = 
FIRST_SLICE_INSTANT.plusSeconds(ONE_HOUR_IN_SECONDS);
+    private static final Instant FIRST_SLICE_INSTANT_NEXT_TWO_HOUR = 
FIRST_SLICE_INSTANT.plusSeconds(ONE_HOUR_IN_SECONDS * 2);
+
+    private static final Slice FIRST_SLICE = Slice.of(FIRST_SLICE_INSTANT, 
ONE_HOUR_IN_SECONDS);
+    private static final Slice FIRST_SLICE_NEXT_TWO_HOUR = 
Slice.of(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR, ONE_HOUR_IN_SECONDS);
+
+    @Nested
+    class Validation {
+    }
+
+    @Test
+    void allSlicesTillShouldReturnOnlyFirstSliceWhenEndAtInTheSameInterval() {
+        assertThat(Slice.allSlicesTill(FIRST_SLICE, 
FIRST_SLICE_INSTANT.plusSeconds(3599)))
+            .containsOnly(FIRST_SLICE);
+    }
+
+    @Test
+    void allSlicesTillShouldReturnAllSlicesBetweenStartAndEndAt() {
+        Stream<Slice> allSlices = Slice.allSlicesTill(FIRST_SLICE, 
FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(3599));
+
+        assertThat(allSlices)
+            .containsExactly(
+                FIRST_SLICE,
+                Slice.of(FIRST_SLICE_INSTANT_NEXT_HOUR, ONE_HOUR_IN_SECONDS),
+                Slice.of(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR, 
ONE_HOUR_IN_SECONDS));
+    }
+
+    @Test
+    void allSlicesTillShouldReturnEmptyIfEndAtBeforeStartSlice() {
+        Stream<Slice> allSlices = 
Slice.allSlicesTill(FIRST_SLICE_NEXT_TWO_HOUR, FIRST_SLICE_INSTANT);
+
+        assertThat(allSlices).isEmpty();
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to