This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 5b3080257868e7ee8f9304754ac9de6fe9fabaf1
Author: Benoit Tellier <[email protected]>
AuthorDate: Mon Apr 13 12:03:17 2020 +0700

    [ADR] Distributed Mail Queue
---
 src/adr/0031-distributed-mail-queue.md | 118 +++++++++++++++++++++++++++++++++
 1 file changed, 118 insertions(+)

diff --git a/src/adr/0031-distributed-mail-queue.md 
b/src/adr/0031-distributed-mail-queue.md
new file mode 100644
index 0000000..42c6691
--- /dev/null
+++ b/src/adr/0031-distributed-mail-queue.md
@@ -0,0 +1,118 @@
+# 31. Distributed Mail Queue
+
+Date: 2020-04-13
+
+## Status
+
+Accepted (lazy consensus)
+
+## Context
+
+MailQueue is a central component of SMTP infrastructure allowing asynchronous 
mail processing. This enables a short 
+SMTP reply time despite a potentially longer mail processing time. It also 
works as a buffer during SMTP peak workload
+to not overload a server. 
+
+Furthermore, when used as a Mail Exchange server (MX), the ability to add 
delays to be observed before dequeing elements
+allows, among others:
+
+ - Delaying retries upon MX delivery failure to a remote site.
+ - Throttling, which could be helpful for not being considered a spammer.
+
+A mailqueue also enables advanced administration operations like traffic 
review, discarding emails, resetting wait 
+delays, purging the queue, etc.
+
+Spring implementation and non distributed implementations rely on an embedded 
ActiveMQ to implement the MailQueue. 
+Emails are being stored in a local file system. An administrator wishing to 
administrate the mailQueue will thus need 
+to interact with all its James servers, which is not friendly in a distributed 
setup.
+
+Distributed James relies on the following third party softwares (among other):
+
+ - **RabbitMQ** for messaging. Good at holding a queue, however some advanced 
administrative operations can't be 
+implemented with this component alone. This is the case for `browse`, 
`getSize` and `arbitrary mail removal`.
+ - **Cassandra** is the metadata database. Due to **tombstone** being used for 
delete, queue is a well known anti-pattern.
+ - **ObjectStorage** (Swift or S3) holds byte content.
+
+## Decision
+
+Distributed James should ship a distributed MailQueue composing the following 
softwares with the following 
+responsibilities:
+
+ - **RabbitMQ** for messaging. A rabbitMQ consumer will trigger dequeue 
operations.
+ - A time series projection of the queue content (order by time list of mail 
metadata) will be maintained in **Cassandra** (see later). Time series avoid 
the 
+aforementioned tombstone anti-pattern, and no polling is performed on this 
projection.
+ - **ObjectStorage** (Swift or S3) holds large byte content. This avoids 
overwhelming other softwares which do not scale
+ as well in term of Input/Output operation per seconds.
+ 
+Here are details of the tables composing Cassandra MailQueue View data-model:
+
+ - **enqueuedMailsV3** holds the time series. The primary key holds the queue 
name, the (rounded) time of enqueue 
+designed as a slice, and a bucketCount. Slicing enables listing a large amount 
of items from a given point in time, in an 
+fashion that is not achievable with a classic partition approach. The 
bucketCount enables sharding and avoids all writes 
+at a given point in time to go to the same Cassandra partition. The clustering 
key is composed of an enqueueId - a 
+unique identifier. The content holds the metadata of the email. This table 
enables, from a starting date, to load all of
+the emails that have ever been in the mailQueue. Its content is never deleted.
+ - **deletedMailsV2** tells wether a mail stored in *enqueuedMailsV3* had been 
deleted or not. The queueName and 
+enqueueId are used as primary key. This table is updated upon dequeue and 
deletes. This table is queried upon dequeue 
+to filter out deleted/purged items. 
+ - **browseStart** store the latest known point in time from which all 
previous emails had been deleted/dequeued. It 
+enables to skip most deleted items upon browsing/deleting queue content. Its 
update is probability based and 
+asynchronously piggy backed on dequeue.
+ 
+Here are the main mail operation sequences:
+
+ - Upon **enqueue** mail content is stored in the *object storage*, an entry 
is added in *enqueuedMailsV3* and a message 
+ is fired on *rabbitMQ*.
+ - **dequeue** is triggered by a rabbitMQ message to be received. 
*deletedMailsV2* is queried to know if the message had
+already been deleted. If not, the mail content is retrieved from the *object 
storage*, then an entry is added in 
+*deletedMailsV2* to notice the email had been dequeued. A dequeue has a random 
probability to trigger a browse start
+update. If so, from current browse start, *enqueuedMailsV3* content is 
iterated, and checked against *deletedMailsV2*
+until the first non deleted / dequeued email is found. This point becomes the 
new browse start. BrowseStart can never 
+point after the start of the current slice. A grace period upon browse start 
update is left to tolerate clock skew.
+Update of the browse start is done randomly as it is a simple way to avoid 
synchronisation in a distributed system: we
+ensure liveness while uneeded browseStart updates being triggered would simply 
waste a few resources.
+ - Upon **browse**, *enqueuedMailsV3* content is iterated, and checked against 
*deletedMailsV2*, starting from the 
+current browse start.
+ - Upon **delete/purge**, *enqueuedMailsV3* content is iterated, and checked 
against *deletedMailsV2*. Mails matching 
+the condition are marked as deleted in *enqueuedMailsV3*.
+ - Upon **getSize**, we perform a browse and count the returned elements.
+ 
+The distributed mail queue requires a fine tuned configuration, which mostly 
depends of the count of Cassandra servers, 
+and of the mailQueue throughput:
+ - **sliceWindow** is the time period of a slice. All the elements of 
**enqueuedMailsV3** sharing the same slice are 
+retrieved at once. The bigger, the more elements are going to be read at once, 
the less frequent browse start update 
+will be. Lower values might result in many almost empty slices to be read, 
generating higher read load. We recommend 
+**sliceWindow** to be chosen from users maximum throughput so that 
approximately 10.000 emails be contained in a slice.
+Only values dividing the current *sliceWindow* are allowed as new values 
(otherwize previous slices might not be found).
+ - **bucketCount** enables spreading the writes in your Cassandra cluster 
using a bucketting strategy. Low values will 
+lead to workload not to be spread evenly, higher values might result in 
uneeded reads upon browse. The count of Cassandra 
+servers should be a good starting value. Only increasing the count of buckets 
is supported as a configuration update as
+decreasing the bucket count might result in some buckets to be lost.
+ - **updateBrowseStartPace** governs the probability of updating browseStart 
upon dequeue/deletes. We recommend choosing 
+a value guarantying a reasonable probability of updating the browse start 
every few slices. Too big values will lead to
+uneeded update of not yet finished slices. Too low values will end up in a 
more expensive browseStart update and browse
+iterating through slices with all their content deleted. This value can be 
changed freely.
+
+We rely on eventSourcing to validate the mailQueue configuration changes upon 
James start following the aforementioned rules.
+
+## Limitations
+
+Delays are not supported. This mail queue implementation is thus not suited 
for a Mail Exchange (MX) implementation.
+The [following proposal](https://issues.apache.org/jira/browse/JAMES-2896) 
could be a solution to support delays.
+
+**enqueuedMailsV3** and **deletedMailsV2** is never cleaned up and the 
corresponding blobs are always referenced. This is not
+ideal both from a privacy and space storage costs point of view.
+
+**getSize** operation is sub-optimal and thus not efficient. Combined with 
metric reporting of mail queue size being 
+periodically performed by all James servers this can lead, upon increasing 
throughput to a Cassandra overload. A configuration
+parameter allows to disable mail queue size reporting as a temporary solution. 
Some alternatives had been presented like 
+[an eventually consistent per slice counters 
approach](https://github.com/linagora/james-project/pull/2565). An other 
+proposed solution is [to rely on RabbitMQ management API to retrieve mail 
queue size](https://github.com/linagora/james-project/pull/2325)
+however by design it cannot take into account purge/delete operations. Read 
+[the corresponding JIRA](https://issues.apache.org/jira/browse/JAMES-2733).
+
+## Consequences
+
+Distributed mail queue allows a better spreading of Mail processing workload. 
It enables a centralized mailQueue
+management for all James servers.
+
+Yet some additional work is required to use it as a Mail Exchange scenario.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to