Author: eric Date: Thu May 28 15:43:09 2015 New Revision: 1682264 URL: http://svn.apache.org/r1682264 Log: Improve the way Cassandra tables are created, patch contributed by Benoit Tellier (MAILBOX-226)
Modified: james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/FunctionRunnerWithRetry.java james/mailbox/trunk/cassandra/src/main/resources/META-INF/spring/mailbox-cassandra.xml james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java james/mailbox/trunk/pom.xml Modified: james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java URL: http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java?rev=1682264&r1=1682263&r2=1682264&view=diff ============================================================================== --- james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java (original) +++ james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java Thu May 28 15:43:09 2015 @@ -46,7 +46,7 @@ public class CassandraMailboxSessionMapp private ModSeqProvider<UUID> modSeqProvider; private int maxRetry; - public CassandraMailboxSessionMapperFactory(CassandraUidProvider uidProvider, ModSeqProvider<UUID> modSeqProvider, CassandraSession session) { + public CassandraMailboxSessionMapperFactory(CassandraUidProvider uidProvider, ModSeqProvider<UUID> modSeqProvider, Session session) { this.uidProvider = uidProvider; this.modSeqProvider = modSeqProvider; this.session = session; Modified: james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java URL: http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java?rev=1682264&r1=1682263&r2=1682264&view=diff ============================================================================== --- james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java (original) +++ james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java Thu May 28 15:43:09 2015 @@ -25,7 +25,7 @@ import com.datastax.driver.core.Session; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; -import org.apache.james.mailbox.cassandra.CassandraSession; +import org.apache.james.mailbox.cassandra.CassandraConstants; import org.apache.james.mailbox.cassandra.mail.utils.SimpleMailboxACLJsonConverter; import org.apache.james.mailbox.cassandra.mail.utils.FunctionRunnerWithRetry; import org.apache.james.mailbox.cassandra.table.CassandraACLTable; @@ -92,7 +92,7 @@ public class CassandraACLMapper { .map((x) -> x.apply(command)) .map(this::updateStoredACL) .orElseGet(() -> insertACL(applyCommandOnEmptyACL(command))); - return resultSet.one().getBool(CassandraSession.LIGHTWEIGHT_TRANSACTION_APPLIED); + return resultSet.one().getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED); } ); } Modified: james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/FunctionRunnerWithRetry.java URL: http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/FunctionRunnerWithRetry.java?rev=1682264&r1=1682263&r2=1682264&view=diff ============================================================================== --- james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/FunctionRunnerWithRetry.java (original) +++ james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/FunctionRunnerWithRetry.java Thu May 28 15:43:09 2015 @@ -22,11 +22,17 @@ package org.apache.james.mailbox.cassand import com.google.common.base.Preconditions; import org.apache.james.mailbox.exception.MailboxException; +import java.util.Optional; import java.util.function.BooleanSupplier; import java.util.stream.IntStream; -public class FunctionRunnerWithRetry { - +public class FunctionRunnerWithRetry<Id> { + + @FunctionalInterface + public interface OptionalSupplier<Id> { + Optional<Id> getAsOptional(); + } + private final int maxRetry; public FunctionRunnerWithRetry(int maxRetry) { @@ -36,10 +42,17 @@ public class FunctionRunnerWithRetry { public void execute(BooleanSupplier functionNotifyingSuccess) throws MailboxException { IntStream.range(0, maxRetry) - .filter( - (x) -> functionNotifyingSuccess.getAsBoolean() - ).findFirst() + .filter((x) -> functionNotifyingSuccess.getAsBoolean()) + .findFirst() .orElseThrow(() -> new MailboxException("Can not execute Boolean Supplier.")); } - + + public Id executeAndRetrieveObject(OptionalSupplier<Id> functionNotifyingSuccess) throws MailboxException { + return IntStream.range(0, maxRetry) + .mapToObj((x) -> functionNotifyingSuccess.getAsOptional()) + .filter(Optional::isPresent) + .findFirst() + .orElseThrow(() -> new MailboxException("Can not execute Optional Supplier. " + maxRetry + " retries.")) + .get(); + } } Modified: james/mailbox/trunk/cassandra/src/main/resources/META-INF/spring/mailbox-cassandra.xml URL: http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/resources/META-INF/spring/mailbox-cassandra.xml?rev=1682264&r1=1682263&r2=1682264&view=diff ============================================================================== --- james/mailbox/trunk/cassandra/src/main/resources/META-INF/spring/mailbox-cassandra.xml (original) +++ james/mailbox/trunk/cassandra/src/main/resources/META-INF/spring/mailbox-cassandra.xml Thu May 28 15:43:09 2015 @@ -50,17 +50,31 @@ <constructor-arg index="0" ref="cassandra-session"/> </bean> + <alias name="jvm-locker" alias="cassandra-locker"/> + <!-- - The parameters are : the IP of a Cassendra cluster, the port, the keyspace and the replication factor - Default values are : localhost, 9042, apache_james and 1 + The Cluster factory is responsible for connecting the cluster + + The ClusterWithKeyspaceCreatedFactory is responsible for creating the keyspace if not present. + + The SessionFactory is responsible for giving a session we can work with --> - <bean id="cassandra-session" class="org.apache.james.mailbox.cassandra.CassandraSession"> + <bean id="cassandra-cluster" class="org.apache.james.mailbox.cassandra.ClusterFactory" factory-method="createClusterForSingleServerWithoutPassWord"> <constructor-arg index="0" value="localhost"/> <constructor-arg index="1" value="9042" type="int"/> - <constructor-arg index="2" value="apache_james"/> - <constructor-arg index="3" value="1" type="int"/> </bean> - <alias name="jvm-locker" alias="cassandra-locker"/> + <bean id="cassandra-cluster-initialized" class="org.apache.james.mailbox.cassandra.ClusterWithKeyspaceCreatedFactory" factory-method="clusterWithInitializedKeyspace"> + <constructor-arg index="0" ref="cassandra-cluster"/> + <constructor-arg index="1" value="apache_james"/> + <constructor-arg index="2" value="1" type="int"/> + </bean> + + <bean id="cassandra-session" class="org.apache.james.mailbox.cassandra.SessionFactory" factory-method=""> + <constructor-arg index="0" ref="cassandra-cluster-initialized"/> + <constructor-arg index="1" value="apache_james"/> + </bean> + + </beans> Modified: james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java URL: http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java?rev=1682264&r1=1682263&r2=1682264&view=diff ============================================================================== --- james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java (original) +++ james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraClusterSingleton.java Thu May 28 15:43:09 2015 @@ -18,23 +18,33 @@ ****************************************************************/ package org.apache.james.mailbox.cassandra; -import org.apache.commons.lang.NotImplementedException; + +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import com.google.common.base.Throwables; +import org.apache.james.mailbox.cassandra.mail.utils.FunctionRunnerWithRetry; import org.cassandraunit.utils.EmbeddedCassandraServerHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Optional; + /** * Class that will creates a single instance of Cassandra session. */ public final class CassandraClusterSingleton { - private final static String CLUSTER_IP = "localhost"; - private final static int CLUSTER_PORT_TEST = 9142; - private final static String KEYSPACE_NAME = "apache_james"; - private final static int DEFAULT_REPLICATION_FACTOR = 1; + private static final String CLUSTER_IP = "localhost"; + private static final int CLUSTER_PORT_TEST = 9142; + private static final String KEYSPACE_NAME = "apache_james"; + private static final int REPLICATION_FACTOR = 1; + + private static final long SLEEP_BEFORE_RETRY = 200; + private static final int MAX_RETRY = 200; private static final Logger LOG = LoggerFactory.getLogger(CassandraClusterSingleton.class); private static CassandraClusterSingleton cluster = null; - private CassandraSession session; + private Session session; /** * Builds a MiniCluster instance. @@ -51,89 +61,45 @@ public final class CassandraClusterSingl } private CassandraClusterSingleton() throws RuntimeException { - try { + try { EmbeddedCassandraServerHelper.startEmbeddedCassandra(); - // Let Cassandra initialization before creating - // the session. Solve very fast computer tests run. - Thread.sleep(2000); - this.session = new CassandraSession(CLUSTER_IP, CLUSTER_PORT_TEST, KEYSPACE_NAME, DEFAULT_REPLICATION_FACTOR); - } catch (Exception e) { - throw new RuntimeException(e); + session = new FunctionRunnerWithRetry<Session>(MAX_RETRY) + .executeAndRetrieveObject(CassandraClusterSingleton.this::tryInitializeSession); + } catch(Exception exception) { + Throwables.propagate(exception); } } - /** - * Return a configuration for the runnning MiniCluster. - * - * @return - */ - public CassandraSession getConf() { + public Session getConf() { return session; } - /** - * Create a specific table. - * - * @param tableName - * the table name - */ - public void ensureTable(String tableName) { - if (tableName.equals("mailbox")) { - session.execute("CREATE TABLE IF NOT EXISTS " + session.getLoggedKeyspace() + ".mailbox (" + "id uuid PRIMARY KEY," + "name text, namespace text," + "uidvalidity bigint," + "user text," + "path text" + ");"); - - session.execute("CREATE INDEX IF NOT EXISTS ON " + session.getLoggedKeyspace() + ".mailbox(path);"); - } else if (tableName.equals("messageCounter")) { - session.execute("CREATE TABLE IF NOT EXISTS " + session.getLoggedKeyspace() + ".messageCounter (" + "mailboxId UUID PRIMARY KEY," + "nextUid bigint," + ");"); - } else if (tableName.equals("mailboxCounters")) { - session.execute("CREATE TABLE IF NOT EXISTS " + session.getLoggedKeyspace() + ".mailboxCounters (" + "mailboxId UUID PRIMARY KEY," + "count counter," + "unseen counter," + "nextModSeq counter" + ");"); - } else if (tableName.equals("message")) { - session.execute("CREATE TABLE IF NOT EXISTS " + session.getLoggedKeyspace() + ".message (" + "mailboxId UUID," + "uid bigint," + "internalDate timestamp," + "bodyStartOctet int," + "content blob," + "modSeq bigint," + "mediaType text," + "subType text," + "fullContentOctets int," - + "bodyOctets int," + "textualLineCount bigint," + "bodyContent blob," + "headerContent blob," + "flagAnswered boolean," + "flagDeleted boolean," + "flagDraft boolean," + "flagRecent boolean," + "flagSeen boolean," + "flagFlagged boolean," + "flagUser boolean," - + "flagVersion bigint,"+ "PRIMARY KEY (mailboxId, uid)" + ");"); - } else if (tableName.equals("subscription")) { - session.execute("CREATE TABLE IF NOT EXISTS " + session.getLoggedKeyspace() + ".subscription (" + "user text," + "mailbox text," + "PRIMARY KEY (mailbox, user)" + ");"); - } else if (tableName.equals("quota")) { - session.execute("CREATE TABLE IF NOT EXISTS " + session.getLoggedKeyspace() + ".quota (" - + "user text PRIMARY KEY," - + "size_quota counter," - + "count_quota counter" - + ");"); - } else if (tableName.equals("acl")) { - session.execute("CREATE TABLE IF NOT EXISTS " + session.getLoggedKeyspace() + ".acl (id uuid PRIMARY KEY, acl text, version bigint);"); - } else { - throw new NotImplementedException("We don't support the class " + tableName); - } + public void ensureAllTables() { + new CassandraTableManager(session).ensureAllTables(); } - /** - * Ensure all tables - */ - public void ensureAllTables() { - ensureTable("mailbox"); - ensureTable("mailboxCounters"); - ensureTable("message"); - ensureTable("subscription"); - ensureTable("acl"); + public void clearAllTables() { + new CassandraTableManager(session).clearAllTables(); } - /** - * Delete all rows from specified table. - * - * @param tableName - */ - public void clearTable(String tableName) { - session.execute("TRUNCATE " + tableName + ";"); + private Optional<Session> tryInitializeSession() { + try { + Cluster cluster = new ClusterFactory().createClusterForSingleServerWithoutPassWord(CLUSTER_IP, CLUSTER_PORT_TEST); + Cluster clusterWithInitializedKeyspace = new ClusterWithKeyspaceCreatedFactory() + .clusterWithInitializedKeyspace(cluster, KEYSPACE_NAME, REPLICATION_FACTOR); + return Optional.of(new SessionFactory().createSession(clusterWithInitializedKeyspace, KEYSPACE_NAME)); + } catch (NoHostAvailableException exception) { + sleep(SLEEP_BEFORE_RETRY); + return Optional.empty(); + } } - /** - * Delete all rows for all tables. - */ - public void clearAllTables() { - clearTable("mailbox"); - clearTable("mailboxCounters"); - clearTable("message"); - clearTable("subscription"); - clearTable("acl"); + private void sleep(long sleepMs) { + try { + Thread.sleep(sleepMs); + } catch(InterruptedException interruptedException) { + Throwables.propagate(interruptedException); + } } } Modified: james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java URL: http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java?rev=1682264&r1=1682263&r2=1682264&view=diff ============================================================================== --- james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java (original) +++ james/mailbox/trunk/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java Thu May 28 15:43:09 2015 @@ -22,7 +22,6 @@ import org.apache.james.mailbox.Abstract import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.cassandra.mail.CassandraModSeqProvider; import org.apache.james.mailbox.cassandra.mail.CassandraUidProvider; -import org.apache.james.mailbox.cassandra.table.CassandraMailboxTable; import org.apache.james.mailbox.exception.BadCredentialsException; import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.store.JVMMailboxPathLocker; @@ -36,7 +35,7 @@ import org.slf4j.LoggerFactory; */ public class CassandraMailboxManagerTest extends AbstractMailboxManagerTest { - private static final CassandraClusterSingleton CLUSTER = CassandraClusterSingleton.build(); + private static final CassandraClusterSingleton CASSANDRA = CassandraClusterSingleton.build(); /** * Setup the mailboxManager. @@ -45,8 +44,8 @@ public class CassandraMailboxManagerTest */ @Before public void setup() throws Exception { - CLUSTER.ensureAllTables(); - CLUSTER.clearAllTables(); + CASSANDRA.ensureAllTables(); + CASSANDRA.clearAllTables(); createMailboxManager(); } @@ -70,9 +69,9 @@ public class CassandraMailboxManagerTest */ @Override protected void createMailboxManager() throws MailboxException { - final CassandraUidProvider uidProvider = new CassandraUidProvider(CLUSTER.getConf()); - final CassandraModSeqProvider modSeqProvider = new CassandraModSeqProvider(CLUSTER.getConf()); - final CassandraMailboxSessionMapperFactory mapperFactory = new CassandraMailboxSessionMapperFactory(uidProvider, modSeqProvider, (CassandraSession) CLUSTER.getConf()); + final CassandraUidProvider uidProvider = new CassandraUidProvider(CASSANDRA.getConf()); + final CassandraModSeqProvider modSeqProvider = new CassandraModSeqProvider(CASSANDRA.getConf()); + final CassandraMailboxSessionMapperFactory mapperFactory = new CassandraMailboxSessionMapperFactory(uidProvider, modSeqProvider, CASSANDRA.getConf()); final CassandraMailboxManager manager = new CassandraMailboxManager(mapperFactory, null, new JVMMailboxPathLocker()); manager.init(); @@ -84,7 +83,7 @@ public class CassandraMailboxManagerTest private void deleteAllMailboxes() throws BadCredentialsException, MailboxException { MailboxSession session = getMailboxManager().createSystemSession("test", LoggerFactory.getLogger("Test")); - CLUSTER.clearTable(CassandraMailboxTable.TABLE_NAME); + CASSANDRA.clearAllTables(); session.close(); } } Modified: james/mailbox/trunk/pom.xml URL: http://svn.apache.org/viewvc/james/mailbox/trunk/pom.xml?rev=1682264&r1=1682263&r2=1682264&view=diff ============================================================================== --- james/mailbox/trunk/pom.xml (original) +++ james/mailbox/trunk/pom.xml Thu May 28 15:43:09 2015 @@ -119,7 +119,7 @@ <junit.version>4.11</junit.version> <jasypt.version>1.9.0</jasypt.version> <guava.version>13.0</guava.version> - <cassandra-driver-core.version>2.0.1</cassandra-driver-core.version> + <cassandra-driver-core.version>2.1.5</cassandra-driver-core.version> <cassandra-unit.version>2.0.2.1</cassandra-unit.version> <assertj.version>2.0.0</assertj.version> <jackson-databinding.version>2.3.3</jackson-databinding.version> --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org