-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA256

Hi,

I found in the current Cassandra implementation a possible data race
during modseq generation. It is located in james-mailbox project and in
cassandra subproject.

The previous implementation, relying on Cassandra counters, first
increments the modseq for the current mailbox in a first request to
Cassandra, and in a second time is reading the new modseq value, using a
second request.

If two mails for the same mailbox are received on different servers at
the same time, it is possible that both server proceed to the first
operation in the same time, setting ModSeq value from n to n+2. During
the second operation, they will return both the n+2 value, and both
mails will have the same modseq.



To solve the bug, I used Cassandra's lightweight tansactions :
 - I created a table dedicated to modseq storage ( I needed a separated
table as in unit tests modseq operations are used with non initialised
mailbox )
 - We ensure that an entry exist for this mailbox
 - We read and update the value ( in two operations ) until our
modification is applied ( using Cassandra lightweight transactions )
 - I added a max retry parameter so that we can't dead lock. Upon
failure ( too many tries ), we throw a mailbox exception.



You will find the corresponding patch as an attachment to this current
e-mail,

Sincerly yours,

Benoit Tellier
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v2

iQIcBAEBCAAGBQJUkcXGAAoJEKXeA9M8Xtuc+V0QAIHOpJLrcR+vTNpWsWqoC3Mr
PrjIXASsMVJPDAXbchS1sHS2u7B9XXvOGTSCVTYoB80VTKBzxX3CWaPwqEt/HiXD
FUnbdnfqpqZavAfviHAZD+p7HPgFRQWT7Fj+iF/ugZi0O74msx49GVmLKU/TzGpX
We7GESVMLfEFqLVZK3nnnL4b9HjW50HaodaWBRVhMaPkjZi274trH9elQRUqOaUC
uIxYbx7zzD6Qzg/S2LM40/iaiq/SqbYHgf30eJphKea8tguOmZSTFBYQ+SZc/smE
ZyYfc+zPSb9fiXsrJHpLnmUds6M0XpA6Ld9L517WDXhbB4hRjC31L4xPePwtDk0g
w8VN0tDIUCrGFY7xJCoL148h/+fYH2O7Sa1Uu3SkJpbPBdehv5IgrHWSkdr3oKGM
te4D9i1g1xO3pomtKXcu2L8NG7z7v43C4cLS/DlbVCGUkNb6dcrCX1FzvhjzDpJm
iJVrzbSgQ72nr6OsApbhCqH5bmAfX7ewsV/Jn0KY6CDnHI3o4jARyjJwuTeESJt8
kh06V3DbqX6YmSuRJVcIcoB9cUv/CqhSHNWqS+EwfVsYVvE/2ErukY5mhQrQ1CAx
BtA3ShN6iTGdYwf9nha+9FlHjZjhu/EI7JiuBd8FBpSUSZQytdnY1TUlHuRiDFal
ODCvx6DAi1jDecFO2x/A
=VOdc
-----END PGP SIGNATURE-----

diff -uNr james-mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraSession.java james-mailbox-new/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraSession.java
--- james-mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraSession.java	2014-12-17 16:04:47.600086583 +0100
+++ james-mailbox-new/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraSession.java	2014-12-17 18:58:56.847205247 +0100
@@ -55,10 +55,12 @@
         session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + ".mailbox (" + "id uuid PRIMARY KEY," + "name text, namespace text," + "uidvalidity bigint," + "user text," + "path text" + ");");
         session.execute("CREATE INDEX IF NOT EXISTS ON " + keyspace + ".mailbox(path);");
         session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + ".messageCounter (" + "mailboxId UUID PRIMARY KEY," + "nextUid bigint," + ");");
-        session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + ".mailboxCounters (" + "mailboxId UUID PRIMARY KEY," + "count counter," + "unseen counter," + "nextModSeq counter" + ");");
+        session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + ".mailboxCounters (" + "mailboxId UUID PRIMARY KEY," + "count counter," + "unseen counter," + ");");
         session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + ".message (" + "mailboxId UUID," + "uid bigint," + "internalDate timestamp," + "bodyStartOctet int," + "content blob," + "modSeq bigint," + "mediaType text," + "subType text," + "fullContentOctets int," + "bodyOctets int,"
                 + "textualLineCount bigint," + "bodyContent blob," + "headerContent blob," + "flagAnswered boolean," + "flagDeleted boolean," + "flagDraft boolean," + "flagRecent boolean," + "flagSeen boolean," + "flagFlagged boolean," + "flagUser boolean," + "PRIMARY KEY (mailboxId, uid)" + ");");
         session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + ".subscription (" + "user text," + "mailbox text," + "PRIMARY KEY (mailbox, user)" + ");");
+        session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + ".modseq (" + "mailboxId uuid PRIMARY KEY," + "nextModseq bigint" + ");");
+
         session.close();
     }
 
diff -uNr james-mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java james-mailbox-new/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
--- james-mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java	2014-12-17 16:04:47.600086583 +0100
+++ james-mailbox-new/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java	2014-12-17 18:58:56.847205247 +0100
@@ -19,13 +19,15 @@
 
 package org.apache.james.mailbox.cassandra.mail;
 
+import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
-import static com.datastax.driver.core.querybuilder.QueryBuilder.incr;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.update;
-import static org.apache.james.mailbox.cassandra.table.CassandraMailboxCountersTable.MAILBOX_ID;
-import static org.apache.james.mailbox.cassandra.table.CassandraMailboxCountersTable.NEXT_MOD_SEQ;
-import static org.apache.james.mailbox.cassandra.table.CassandraMailboxCountersTable.TABLE_NAME;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.set;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.NEXT_MODSEQ;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.TABLE_NAME;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.MAILBOX_ID;
+
 
 import java.util.UUID;
 
@@ -39,21 +41,49 @@
 
 public class CassandraModSeqProvider implements ModSeqProvider<UUID> {
 
+    private static final int DEFAULT_MAX_RETRY = 100000;
+
+
     private Session session;
+    private final int applied = 0;
+    private int maxRetry;
 
-    public CassandraModSeqProvider(Session session) {
+    public CassandraModSeqProvider(Session session, int maxRetry) {
         this.session = session;
+        this.maxRetry = maxRetry;
+    }
+
+    public CassandraModSeqProvider(Session session) {
+        this(session, DEFAULT_MAX_RETRY);
     }
 
     @Override
     public long nextModSeq(MailboxSession mailboxSession, Mailbox<UUID> mailbox) throws MailboxException {
-        session.execute(update(TABLE_NAME).with(incr(NEXT_MOD_SEQ)).where(eq(MAILBOX_ID, mailbox.getMailboxId())));
-        return highestModSeq(mailboxSession, mailbox);
+
+        long highestModSeq = highestModSeq(mailboxSession, mailbox);
+        if (highestModSeq == 0) {
+            ResultSet result = session.execute(insertInto(TABLE_NAME).value(NEXT_MODSEQ, highestModSeq+1).value(MAILBOX_ID, mailbox.getMailboxId()).ifNotExists());
+            if(result.one().getBool(applied)) {
+                return highestModSeq+1;
+            }
+        }
+        int tries = 0;
+        boolean isApplied;
+        do {
+            tries++;
+            highestModSeq = highestModSeq(mailboxSession, mailbox);
+            ResultSet result = session.execute(update(TABLE_NAME).onlyIf(eq(NEXT_MODSEQ, highestModSeq)).with(set(NEXT_MODSEQ, highestModSeq+1)).where(eq(MAILBOX_ID, mailbox.getMailboxId())));
+            isApplied = result.one().getBool(applied);
+        } while (! isApplied && tries < maxRetry);
+        if( ! isApplied ) {
+            throw new MailboxException("Can not obtain rights to manage UID");
+        }
+        return highestModSeq+1;
     }
 
     @Override
     public long highestModSeq(MailboxSession mailboxSession, Mailbox<UUID> mailbox) throws MailboxException {
-        ResultSet result = session.execute(select(NEXT_MOD_SEQ).from(TABLE_NAME).where(eq(MAILBOX_ID, mailbox.getMailboxId())));
-        return result.isExhausted() ? 0 : result.one().getLong(NEXT_MOD_SEQ);
+        ResultSet result = session.execute(select(NEXT_MODSEQ).from(TABLE_NAME).where(eq(MAILBOX_ID, mailbox.getMailboxId())));
+        return result.isExhausted() ? 0 : result.one().getLong(NEXT_MODSEQ);
     }
 }
diff -uNr james-mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMailboxCountersTable.java james-mailbox-new/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMailboxCountersTable.java
--- james-mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMailboxCountersTable.java	2014-12-17 16:04:47.600086583 +0100
+++ james-mailbox-new/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMailboxCountersTable.java	2014-12-17 18:58:56.850538581 +0100
@@ -24,5 +24,4 @@
     String MAILBOX_ID = "mailboxId";
     String COUNT = "count";
     String UNSEEN = "unseen";
-    String NEXT_MOD_SEQ = "nextModSeq";
 }
\ No newline at end of file
diff -uNr james-mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageModseqTable.java james-mailbox-new/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageModseqTable.java
--- james-mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageModseqTable.java	1970-01-01 01:00:00.000000000 +0100
+++ james-mailbox-new/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageModseqTable.java	2014-12-17 18:58:56.850538581 +0100
@@ -0,0 +1,26 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.table;
+
+public interface CassandraMessageModseqTable {
+    String TABLE_NAME = "modseq";
+    String MAILBOX_ID = "mailboxId";
+    String NEXT_MODSEQ = "nextModseq";
+}
diff -uNr james-mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java james-mailbox-new/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java
--- james-mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java	2014-12-17 16:04:47.600086583 +0100
+++ james-mailbox-new/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java	2014-12-17 18:58:56.850538581 +0100
@@ -87,13 +87,15 @@
         } else if (tableName.equals("messageCounter")) {
             session.execute("CREATE TABLE IF NOT EXISTS " + session.getLoggedKeyspace() + ".messageCounter (" + "mailboxId UUID PRIMARY KEY," + "nextUid bigint," + ");");
         } else if (tableName.equals("mailboxCounters")) {
-            session.execute("CREATE TABLE IF NOT EXISTS " + session.getLoggedKeyspace() + ".mailboxCounters (" + "mailboxId UUID PRIMARY KEY," + "count counter," + "unseen counter," + "nextModSeq counter" + ");");
+            session.execute("CREATE TABLE IF NOT EXISTS " + session.getLoggedKeyspace() + ".mailboxCounters (" + "mailboxId UUID PRIMARY KEY," + "count counter," + "unseen counter," + ");");
         } else if (tableName.equals("message")) {
             session.execute("CREATE TABLE IF NOT EXISTS " + session.getLoggedKeyspace() + ".message (" + "mailboxId UUID," + "uid bigint," + "internalDate timestamp," + "bodyStartOctet int," + "content blob," + "modSeq bigint," + "mediaType text," + "subType text," + "fullContentOctets int,"
                     + "bodyOctets int," + "textualLineCount bigint," + "bodyContent blob," + "headerContent blob," + "flagAnswered boolean," + "flagDeleted boolean," + "flagDraft boolean," + "flagRecent boolean," + "flagSeen boolean," + "flagFlagged boolean," + "flagUser boolean,"
                     + "PRIMARY KEY (mailboxId, uid)" + ");");
         } else if (tableName.equals("subscription")) {
             session.execute("CREATE TABLE IF NOT EXISTS " + session.getLoggedKeyspace() + ".subscription (" + "user text," + "mailbox text," + "PRIMARY KEY (mailbox, user)" + ");");
+        } else if(tableName.equals("modseq")) {
+            session.execute("CREATE TABLE IF NOT EXISTS " + session.getLoggedKeyspace() + ".modseq (" + "mailboxId uuid PRIMARY KEY," + "nextModseq bigint" + ");");
         } else {
             throw new NotImplementedException("We don't support the class " + tableName);
         }
@@ -107,6 +109,7 @@
         ensureTable("mailboxCounters");
         ensureTable("message");
         ensureTable("subscription");
+        ensureTable("modseq");
     }
 
     /**
@@ -126,6 +129,7 @@
         clearTable("mailboxCounters");
         clearTable("message");
         clearTable("subscription");
+        clearTable("modseq");
     }
 
 }

Attachment: fix_cassandra_modseq_provider.patch.sig
Description: PGP signature

---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to