JAMES-1945 Parralelize UID update in CassandraUidProvider
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/6d94a8fe Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/6d94a8fe Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/6d94a8fe Branch: refs/heads/master Commit: 6d94a8fe624160ee0dc0e42ed8be51c570347cc0 Parents: f2b7cd7 Author: Benoit Tellier <[email protected]> Authored: Mon Feb 20 11:58:13 2017 +0700 Committer: benwa <[email protected]> Committed: Thu Feb 23 10:38:02 2017 +0700 ---------------------------------------------------------------------- .../cassandra/mail/CassandraUidProvider.java | 145 +++++++++++-------- .../CassandraSubscriptionManagerTest.java | 7 +- 2 files changed, 87 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/6d94a8fe/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java index a6bd980..4aeb483 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java @@ -19,46 +19,71 @@ package org.apache.james.mailbox.cassandra.mail; +import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; import static com.datastax.driver.core.querybuilder.QueryBuilder.set; import static com.datastax.driver.core.querybuilder.QueryBuilder.update; +import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable.MAILBOX_ID; import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable.NEXT_UID; +import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable.TABLE_NAME; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import javax.inject.Inject; -import org.apache.james.backends.cassandra.utils.CassandraConstants; +import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry; -import org.apache.james.backends.cassandra.utils.LightweightTransactionException; import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.MessageUid; import org.apache.james.mailbox.cassandra.CassandraId; -import org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable; import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.model.MailboxId; import org.apache.james.mailbox.store.mail.UidProvider; import org.apache.james.mailbox.store.mail.model.Mailbox; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.james.util.OptionalConverter; -import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Session; -import com.datastax.driver.core.querybuilder.BuiltStatement; -import com.google.common.base.Throwables; public class CassandraUidProvider implements UidProvider { - public final static int DEFAULT_MAX_RETRY = 100000; - private static final Logger LOG = LoggerFactory.getLogger(CassandraUidProvider.class); + private static final int DEFAULT_MAX_RETRY = 100000; + private static final String CONDITION = "Condition"; - private final Session session; + private final CassandraAsyncExecutor executor; private final FunctionRunnerWithRetry runner; + private final PreparedStatement insertStatement; + private final PreparedStatement updateStatement; + private final PreparedStatement selectStatement; public CassandraUidProvider(Session session, int maxRetry) { - this.session = session; + this.executor = new CassandraAsyncExecutor(session); this.runner = new FunctionRunnerWithRetry(maxRetry); + this.selectStatement = prepareSelect(session); + this.updateStatement = prepareUpdate(session); + this.insertStatement = prepareInsert(session); + } + + private PreparedStatement prepareSelect(Session session) { + return session.prepare(select(NEXT_UID) + .from(TABLE_NAME) + .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID)))); + } + + private PreparedStatement prepareUpdate(Session session) { + return session.prepare(update(TABLE_NAME) + .onlyIf(eq(NEXT_UID, bindMarker(CONDITION))) + .with(set(NEXT_UID, bindMarker(NEXT_UID))) + .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID)))); + } + + private PreparedStatement prepareInsert(Session session) { + return session.prepare(insertInto(TABLE_NAME) + .value(NEXT_UID, MessageUid.MIN_VALUE.asLong()) + .value(MAILBOX_ID, bindMarker(MAILBOX_ID)) + .ifNotExists()); } @Inject @@ -74,75 +99,67 @@ public class CassandraUidProvider implements UidProvider { @Override public MessageUid nextUid(MailboxSession session, MailboxId mailboxId) throws MailboxException { CassandraId cassandraId = (CassandraId) mailboxId; - if (! findHighestUid(cassandraId).isPresent()) { - Optional<MessageUid> optional = tryInsertUid(cassandraId, Optional.empty()); - if (optional.isPresent()) { - return optional.get(); - } - } + return nextUid(cassandraId) + .join() + .orElseThrow(() -> new MailboxException("Error during Uid update")); + } - try { - return runner.executeAndRetrieveObject( - () -> { - try { - return tryUpdateUid(cassandraId, findHighestUid(cassandraId)); - } catch (Exception exception) { - LOG.error("Can not retrieve next Uid", exception); - throw Throwables.propagate(exception); - } - }); - } catch (LightweightTransactionException e) { - throw new MailboxException("Error during Uid update", e); - } + public CompletableFuture<Optional<MessageUid>> nextUid(CassandraId cassandraId) { + return findHighestUid(cassandraId) + .thenCompose(optional -> { + if (optional.isPresent()) { + return tryUpdateUid(cassandraId, optional); + } + return tryInsert(cassandraId); + }) + .thenCompose(optional -> { + if (optional.isPresent()) { + return CompletableFuture.completedFuture(optional); + } + return runner.executeAsyncAndRetrieveObject( + () -> findHighestUid(cassandraId) + .thenCompose(readUid -> tryUpdateUid(cassandraId, readUid))); + }); } @Override public com.google.common.base.Optional<MessageUid> lastUid(MailboxSession mailboxSession, Mailbox mailbox) throws MailboxException { - return findHighestUid((CassandraId) mailbox.getMailboxId()); - } - - private com.google.common.base.Optional<MessageUid> findHighestUid(CassandraId mailboxId) throws MailboxException { - ResultSet result = session.execute( - select(NEXT_UID) - .from(CassandraMessageUidTable.TABLE_NAME) - .where(eq(CassandraMessageUidTable.MAILBOX_ID, mailboxId.asUuid()))); - if (result.isExhausted()) { - return com.google.common.base.Optional.absent(); - } else { - return com.google.common.base.Optional.of(MessageUid.of(result.one().getLong(NEXT_UID))); - } + return OptionalConverter.toGuava(findHighestUid((CassandraId) mailbox.getMailboxId()).join()); } - private Optional<MessageUid> tryInsertUid(CassandraId mailboxId, Optional<MessageUid> uid) { - MessageUid nextUid = uid.map(MessageUid::next).orElse(MessageUid.MIN_VALUE); - return transactionalStatementToOptionalUid(nextUid, - insertInto(CassandraMessageUidTable.TABLE_NAME) - .value(NEXT_UID, nextUid.asLong()) - .value(CassandraMessageUidTable.MAILBOX_ID, mailboxId.asUuid()) - .ifNotExists()); + private CompletableFuture<Optional<MessageUid>> findHighestUid(CassandraId mailboxId) { + return executor.executeSingleRow( + selectStatement.bind() + .setUUID(MAILBOX_ID, mailboxId.asUuid())) + .thenApply(optional -> optional.map(row -> MessageUid.of(row.getLong(NEXT_UID)))); } - private Optional<MessageUid> tryUpdateUid(CassandraId mailboxId, com.google.common.base.Optional<MessageUid> uid) { + private CompletableFuture<Optional<MessageUid>> tryUpdateUid(CassandraId mailboxId, Optional<MessageUid> uid) { if (uid.isPresent()) { MessageUid nextUid = uid.get().next(); - return transactionalStatementToOptionalUid(nextUid, - update(CassandraMessageUidTable.TABLE_NAME) - .onlyIf(eq(NEXT_UID, uid.get().asLong())) - .with(set(NEXT_UID, nextUid.asLong())) - .where(eq(CassandraMessageUidTable.MAILBOX_ID, mailboxId.asUuid()))); + return executor.executeReturnApplied( + updateStatement.bind() + .setUUID(MAILBOX_ID, mailboxId.asUuid()) + .setLong(CONDITION, uid.get().asLong()) + .setLong(NEXT_UID, nextUid.asLong())) + .thenApply(success -> successToUid(nextUid, success)); } else { - return transactionalStatementToOptionalUid(MessageUid.MIN_VALUE, - update(CassandraMessageUidTable.TABLE_NAME) - .onlyIf(eq(NEXT_UID, null)) - .with(set(NEXT_UID, MessageUid.MIN_VALUE.asLong())) - .where(eq(CassandraMessageUidTable.MAILBOX_ID, mailboxId.asUuid()))); + return tryInsert(mailboxId); } } - private Optional<MessageUid> transactionalStatementToOptionalUid(MessageUid uid, BuiltStatement statement) { - if(session.execute(statement).one().getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED)) { + private CompletableFuture<Optional<MessageUid>> tryInsert(CassandraId mailboxId) { + return executor.executeReturnApplied( + insertStatement.bind() + .setUUID(MAILBOX_ID, mailboxId.asUuid())) + .thenApply(success -> successToUid(MessageUid.MIN_VALUE, success)); + } + + private Optional<MessageUid> successToUid(MessageUid uid, Boolean success) { + if (success) { return Optional.of(uid); } return Optional.empty(); } + } http://git-wip-us.apache.org/repos/asf/james-project/blob/6d94a8fe/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java index b0b334e..0a3838d 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java @@ -34,7 +34,9 @@ import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO; import org.apache.james.mailbox.cassandra.mail.CassandraModSeqProvider; import org.apache.james.mailbox.cassandra.mail.CassandraUidProvider; import org.apache.james.mailbox.cassandra.modules.CassandraMailboxCounterModule; +import org.apache.james.mailbox.cassandra.modules.CassandraModSeqModule; import org.apache.james.mailbox.cassandra.modules.CassandraSubscriptionModule; +import org.apache.james.mailbox.cassandra.modules.CassandraUidModule; /** * Test Cassandra subscription against some general purpose written code. @@ -43,7 +45,10 @@ public class CassandraSubscriptionManagerTest extends AbstractSubscriptionManage private static final CassandraCluster cassandra = CassandraCluster.create( new CassandraModuleComposite( - new CassandraSubscriptionModule(), new CassandraMailboxCounterModule())); + new CassandraSubscriptionModule(), + new CassandraMailboxCounterModule(), + new CassandraUidModule(), + new CassandraModSeqModule())); @Override public SubscriptionManager createSubscriptionManager() { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
