This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 5b3080257868e7ee8f9304754ac9de6fe9fabaf1 Author: Benoit Tellier <[email protected]> AuthorDate: Mon Apr 13 12:03:17 2020 +0700 [ADR] Distributed Mail Queue --- src/adr/0031-distributed-mail-queue.md | 118 +++++++++++++++++++++++++++++++++ 1 file changed, 118 insertions(+) diff --git a/src/adr/0031-distributed-mail-queue.md b/src/adr/0031-distributed-mail-queue.md new file mode 100644 index 0000000..42c6691 --- /dev/null +++ b/src/adr/0031-distributed-mail-queue.md @@ -0,0 +1,118 @@ +# 31. Distributed Mail Queue + +Date: 2020-04-13 + +## Status + +Accepted (lazy consensus) + +## Context + +MailQueue is a central component of SMTP infrastructure allowing asynchronous mail processing. This enables a short +SMTP reply time despite a potentially longer mail processing time. It also works as a buffer during SMTP peak workload +to not overload a server. + +Furthermore, when used as a Mail Exchange server (MX), the ability to add delays to be observed before dequeing elements +allows, among others: + + - Delaying retries upon MX delivery failure to a remote site. + - Throttling, which could be helpful for not being considered a spammer. + +A mailqueue also enables advanced administration operations like traffic review, discarding emails, resetting wait +delays, purging the queue, etc. + +Spring implementation and non distributed implementations rely on an embedded ActiveMQ to implement the MailQueue. +Emails are being stored in a local file system. An administrator wishing to administrate the mailQueue will thus need +to interact with all its James servers, which is not friendly in a distributed setup. + +Distributed James relies on the following third party softwares (among other): + + - **RabbitMQ** for messaging. Good at holding a queue, however some advanced administrative operations can't be +implemented with this component alone. This is the case for `browse`, `getSize` and `arbitrary mail removal`. + - **Cassandra** is the metadata database. Due to **tombstone** being used for delete, queue is a well known anti-pattern. + - **ObjectStorage** (Swift or S3) holds byte content. + +## Decision + +Distributed James should ship a distributed MailQueue composing the following softwares with the following +responsibilities: + + - **RabbitMQ** for messaging. A rabbitMQ consumer will trigger dequeue operations. + - A time series projection of the queue content (order by time list of mail metadata) will be maintained in **Cassandra** (see later). Time series avoid the +aforementioned tombstone anti-pattern, and no polling is performed on this projection. + - **ObjectStorage** (Swift or S3) holds large byte content. This avoids overwhelming other softwares which do not scale + as well in term of Input/Output operation per seconds. + +Here are details of the tables composing Cassandra MailQueue View data-model: + + - **enqueuedMailsV3** holds the time series. The primary key holds the queue name, the (rounded) time of enqueue +designed as a slice, and a bucketCount. Slicing enables listing a large amount of items from a given point in time, in an +fashion that is not achievable with a classic partition approach. The bucketCount enables sharding and avoids all writes +at a given point in time to go to the same Cassandra partition. The clustering key is composed of an enqueueId - a +unique identifier. The content holds the metadata of the email. This table enables, from a starting date, to load all of +the emails that have ever been in the mailQueue. Its content is never deleted. + - **deletedMailsV2** tells wether a mail stored in *enqueuedMailsV3* had been deleted or not. The queueName and +enqueueId are used as primary key. This table is updated upon dequeue and deletes. This table is queried upon dequeue +to filter out deleted/purged items. + - **browseStart** store the latest known point in time from which all previous emails had been deleted/dequeued. It +enables to skip most deleted items upon browsing/deleting queue content. Its update is probability based and +asynchronously piggy backed on dequeue. + +Here are the main mail operation sequences: + + - Upon **enqueue** mail content is stored in the *object storage*, an entry is added in *enqueuedMailsV3* and a message + is fired on *rabbitMQ*. + - **dequeue** is triggered by a rabbitMQ message to be received. *deletedMailsV2* is queried to know if the message had +already been deleted. If not, the mail content is retrieved from the *object storage*, then an entry is added in +*deletedMailsV2* to notice the email had been dequeued. A dequeue has a random probability to trigger a browse start +update. If so, from current browse start, *enqueuedMailsV3* content is iterated, and checked against *deletedMailsV2* +until the first non deleted / dequeued email is found. This point becomes the new browse start. BrowseStart can never +point after the start of the current slice. A grace period upon browse start update is left to tolerate clock skew. +Update of the browse start is done randomly as it is a simple way to avoid synchronisation in a distributed system: we +ensure liveness while uneeded browseStart updates being triggered would simply waste a few resources. + - Upon **browse**, *enqueuedMailsV3* content is iterated, and checked against *deletedMailsV2*, starting from the +current browse start. + - Upon **delete/purge**, *enqueuedMailsV3* content is iterated, and checked against *deletedMailsV2*. Mails matching +the condition are marked as deleted in *enqueuedMailsV3*. + - Upon **getSize**, we perform a browse and count the returned elements. + +The distributed mail queue requires a fine tuned configuration, which mostly depends of the count of Cassandra servers, +and of the mailQueue throughput: + - **sliceWindow** is the time period of a slice. All the elements of **enqueuedMailsV3** sharing the same slice are +retrieved at once. The bigger, the more elements are going to be read at once, the less frequent browse start update +will be. Lower values might result in many almost empty slices to be read, generating higher read load. We recommend +**sliceWindow** to be chosen from users maximum throughput so that approximately 10.000 emails be contained in a slice. +Only values dividing the current *sliceWindow* are allowed as new values (otherwize previous slices might not be found). + - **bucketCount** enables spreading the writes in your Cassandra cluster using a bucketting strategy. Low values will +lead to workload not to be spread evenly, higher values might result in uneeded reads upon browse. The count of Cassandra +servers should be a good starting value. Only increasing the count of buckets is supported as a configuration update as +decreasing the bucket count might result in some buckets to be lost. + - **updateBrowseStartPace** governs the probability of updating browseStart upon dequeue/deletes. We recommend choosing +a value guarantying a reasonable probability of updating the browse start every few slices. Too big values will lead to +uneeded update of not yet finished slices. Too low values will end up in a more expensive browseStart update and browse +iterating through slices with all their content deleted. This value can be changed freely. + +We rely on eventSourcing to validate the mailQueue configuration changes upon James start following the aforementioned rules. + +## Limitations + +Delays are not supported. This mail queue implementation is thus not suited for a Mail Exchange (MX) implementation. +The [following proposal](https://issues.apache.org/jira/browse/JAMES-2896) could be a solution to support delays. + +**enqueuedMailsV3** and **deletedMailsV2** is never cleaned up and the corresponding blobs are always referenced. This is not +ideal both from a privacy and space storage costs point of view. + +**getSize** operation is sub-optimal and thus not efficient. Combined with metric reporting of mail queue size being +periodically performed by all James servers this can lead, upon increasing throughput to a Cassandra overload. A configuration +parameter allows to disable mail queue size reporting as a temporary solution. Some alternatives had been presented like +[an eventually consistent per slice counters approach](https://github.com/linagora/james-project/pull/2565). An other +proposed solution is [to rely on RabbitMQ management API to retrieve mail queue size](https://github.com/linagora/james-project/pull/2325) +however by design it cannot take into account purge/delete operations. Read +[the corresponding JIRA](https://issues.apache.org/jira/browse/JAMES-2733). + +## Consequences + +Distributed mail queue allows a better spreading of Mail processing workload. It enables a centralized mailQueue +management for all James servers. + +Yet some additional work is required to use it as a Mail Exchange scenario. --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
